未验证 提交 71918623 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #7556 from taosdata/fix/query

Fix/query
...@@ -36,7 +36,7 @@ extern "C" { ...@@ -36,7 +36,7 @@ extern "C" {
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \ #define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo))) (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
......
...@@ -35,6 +35,7 @@ typedef struct SCompareParam { ...@@ -35,6 +35,7 @@ typedef struct SCompareParam {
static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) { static bool needToMerge(SSDataBlock* pBlock, SArray* columnIndexList, int32_t index, char **buf) {
int32_t ret = 0; int32_t ret = 0;
size_t size = taosArrayGetSize(columnIndexList); size_t size = taosArrayGetSize(columnIndexList);
if (size > 0) { if (size > 0) {
ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC); ret = compare_aRv(pBlock, columnIndexList, (int32_t) size, index, buf, TSDB_ORDER_ASC);
...@@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc ...@@ -564,9 +565,11 @@ static void savePrevOrderColumns(char** prevRow, SArray* pColumnList, SSDataBloc
(*hasPrev) = true; (*hasPrev) = true;
} }
// tsdb_func_tag function only produce one row of result. Therefore, we need to copy the
// output value to multiple rows
static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) { static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput, int32_t numOfRows) {
if (numOfRows <= 1) { if (numOfRows <= 1) {
return ; return;
} }
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
...@@ -574,12 +577,49 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput ...@@ -574,12 +577,49 @@ static void setTagValueForMultipleRows(SQLFunctionCtx* pCtx, int32_t numOfOutput
continue; continue;
} }
int32_t inc = numOfRows - 1; // tsdb_func_tag function only produce one row of result char* src = pCtx[k].pOutput;
char* src = pCtx[k].pOutput; char* dst = pCtx[k].pOutput + pCtx[k].outputBytes;
for (int32_t i = 0; i < inc; ++i) { // Let's start from the second row, as the first row has result value already.
pCtx[k].pOutput += pCtx[k].outputBytes; for (int32_t i = 1; i < numOfRows; ++i) {
memcpy(pCtx[k].pOutput, src, (size_t)pCtx[k].outputBytes); memcpy(dst, src, (size_t)pCtx[k].outputBytes);
dst += pCtx[k].outputBytes;
}
}
}
static void doMergeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr, int32_t rowIndex, char** pDataPtr) {
for (int32_t j = 0; j < numOfExpr; ++j) {
pCtx[j].pInput = pDataPtr[j] + pCtx[j].inputBytes * rowIndex;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
} else {
aAggs[functionId].mergeFunc(&pCtx[j]);
}
}
}
static void doFinalizeResultImpl(SMultiwayMergeInfo* pInfo, SQLFunctionCtx *pCtx, int32_t numOfExpr) {
for(int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
} else {
aAggs[functionId].xFinalize(&pCtx[j]);
} }
} }
} }
...@@ -588,52 +628,18 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -588,52 +628,18 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
SMultiwayMergeInfo* pInfo = pOperator->info; SMultiwayMergeInfo* pInfo = pOperator->info;
SQLFunctionCtx* pCtx = pInfo->binfo.pCtx; SQLFunctionCtx* pCtx = pInfo->binfo.pCtx;
char** add = calloc(pBlock->info.numOfCols, POINTER_BYTES); char** addrPtr = calloc(pBlock->info.numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
add[i] = pCtx[i].pInput; addrPtr[i] = pCtx[i].pInput;
pCtx[i].size = 1; pCtx[i].size = 1;
} }
for(int32_t i = 0; i < pBlock->info.rows; ++i) { for(int32_t i = 0; i < pBlock->info.rows; ++i) {
if (pInfo->hasPrev) { if (pInfo->hasPrev) {
if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) { if (needToMerge(pBlock, pInfo->orderColumnList, i, pInfo->prevRow)) {
for (int32_t j = 0; j < numOfExpr; ++j) { doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
} else { } else {
for(int32_t j = 0; j < numOfExpr; ++j) { // TODO refactor doFinalizeResultImpl(pInfo, pCtx, numOfExpr);
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pCtx[j]);
}
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput); int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput);
setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows); setTagValueForMultipleRows(pCtx, pOperator->numOfOutput, numOfRows);
...@@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -655,48 +661,10 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo); aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
} }
for (int32_t j = 0; j < numOfExpr; ++j) { doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
} }
} else { } else {
for (int32_t j = 0; j < numOfExpr; ++j) { doMergeResultImpl(pInfo, pCtx, numOfExpr, i, addrPtr);
pCtx[j].pInput = add[j] + pCtx[j].inputBytes * i;
}
for (int32_t j = 0; j < numOfExpr; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]);
}
} }
savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev); savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, i, &pInfo->hasPrev);
...@@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -704,11 +672,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
{ {
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
pCtx[i].pInput = add[i]; pCtx[i].pInput = addrPtr[i];
} }
} }
tfree(add); tfree(addrPtr);
} }
static bool isAllSourcesCompleted(SGlobalMerger *pMerger) { static bool isAllSourcesCompleted(SGlobalMerger *pMerger) {
...@@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) { ...@@ -816,6 +784,8 @@ SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup) {
SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index]; SLocalDataSource *pOneDataSrc = pMerger->pLocalDataSrc[pTree->pNode[0].index];
bool sameGroup = true; bool sameGroup = true;
if (pInfo->hasPrev) { if (pInfo->hasPrev) {
// todo refactor extract method
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList); int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
// if this row belongs to current result set group // if this row belongs to current result set group
...@@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -955,9 +925,10 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
break; break;
} }
bool sameGroup = true;
if (pAggInfo->hasGroupColData) { if (pAggInfo->hasGroupColData) {
bool sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData); sameGroup = isSameGroup(pAggInfo->groupColumnList, pBlock, pAggInfo->currentGroupColData);
if (!sameGroup) { if (!sameGroup && !pAggInfo->multiGroupResults) {
*newgroup = true; *newgroup = true;
pAggInfo->hasDataBlockForNewGroup = true; pAggInfo->hasDataBlockForNewGroup = true;
pAggInfo->pExistBlock = pBlock; pAggInfo->pExistBlock = pBlock;
...@@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -976,26 +947,11 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
} }
if (handleData) { // data in current group is all handled if (handleData) { // data in current group is all handled
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { doFinalizeResultImpl(pAggInfo, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
int32_t functionId = pAggInfo->binfo.pCtx[j].functionId;
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
}
int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput); int32_t numOfRows = getNumOfResult(pOperator->pRuntimeEnv, pAggInfo->binfo.pCtx, pOperator->numOfOutput);
pAggInfo->binfo.pRes->info.rows += numOfRows;
pAggInfo->binfo.pRes->info.rows += numOfRows;
setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows); setTagValueForMultipleRows(pAggInfo->binfo.pCtx, pOperator->numOfOutput, numOfRows);
} }
...@@ -1019,71 +975,127 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -1019,71 +975,127 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
return (pRes->info.rows != 0)? pRes:NULL; return (pRes->info.rows != 0)? pRes:NULL;
} }
static SSDataBlock* skipGroupBlock(SOperatorInfo* pOperator, bool* newgroup) { static void doHandleDataInCurrentGroup(SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock, int32_t rowIndex) {
SSLimitOperatorInfo *pInfo = pOperator->info; if (pInfo->currentOffset > 0) {
assert(pInfo->currentGroupOffset >= 0); pInfo->currentOffset -= 1;
} else {
// discard the data rows in current group
if (pInfo->limit.limit < 0 || (pInfo->limit.limit >= 0 && pInfo->rowsTotal < pInfo->limit.limit)) {
size_t num1 = taosArrayGetSize(pInfo->pRes->pDataBlock);
for (int32_t i = 0; i < num1; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SColumnInfoData *pDstInfoData = taosArrayGet(pInfo->pRes->pDataBlock, i);
SSDataBlock* pBlock = NULL; SColumnInfo *pColInfo = &pColInfoData->info;
if (pInfo->currentGroupOffset == 0) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); char *pSrc = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); char *pDst = (char *)pDstInfoData->pData + (pInfo->pRes->info.rows * pColInfo->bytes);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
}
if (*newgroup == false && pInfo->limit.limit > 0 && pInfo->rowsTotal >= pInfo->limit.limit) { memcpy(pDst, pSrc, pColInfo->bytes);
while ((*newgroup) == false) { // ignore the remain blocks
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
} }
pInfo->rowsTotal += 1;
pInfo->pRes->info.rows += 1;
} }
}
}
static void ensureOutputBuf(SSLimitOperatorInfo * pInfo, SSDataBlock *pResultBlock, int32_t numOfRows) {
if (pInfo->capacity < pResultBlock->info.rows + numOfRows) {
int32_t total = pResultBlock->info.rows + numOfRows;
size_t num = taosArrayGetSize(pResultBlock->pDataBlock);
for (int32_t i = 0; i < num; ++i) {
SColumnInfoData *pInfoData = taosArrayGet(pResultBlock->pDataBlock, i);
char *tmp = realloc(pInfoData->pData, total * pInfoData->info.bytes);
if (tmp != NULL) {
pInfoData->pData = tmp;
} else {
// todo handle the malloc failure
}
return pBlock; pInfo->capacity = total;
pInfo->threshold = (int64_t) (total * 0.8);
}
} }
}
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); enum {
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); BLOCK_NEW_GROUP = 1,
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); BLOCK_NO_GROUP = 2,
BLOCK_SAME_GROUP = 3,
};
if (pBlock == NULL) { static int32_t doSlimitImpl(SOperatorInfo* pOperator, SSLimitOperatorInfo* pInfo, SSDataBlock* pBlock) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); int32_t rowIndex = 0;
pOperator->status = OP_EXEC_DONE;
return NULL; while (rowIndex < pBlock->info.rows) {
int32_t numOfCols = (int32_t)taosArrayGetSize(pInfo->orderColumnList);
bool samegroup = true;
if (pInfo->hasPrev) {
for (int32_t i = 0; i < numOfCols; ++i) {
SColIndex *pIndex = taosArrayGet(pInfo->orderColumnList, i);
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, pIndex->colIndex);
SColumnInfo *pColInfo = &pColInfoData->info;
char *d = rowIndex * pColInfo->bytes + (char *)pColInfoData->pData;
int32_t ret = columnValueAscendingComparator(pInfo->prevRow[i], d, pColInfo->type, pColInfo->bytes);
if (ret != 0) { // it is a new group
samegroup = false;
break;
}
}
} }
while(1) { if (!samegroup || !pInfo->hasPrev) {
if (*newgroup) { pInfo->ignoreCurrentGroup = false;
pInfo->currentGroupOffset -= 1; savePrevOrderColumns(pInfo->prevRow, pInfo->orderColumnList, pBlock, rowIndex, &pInfo->hasPrev);
*newgroup = false;
pInfo->currentOffset = pInfo->limit.offset; // reset the offset value for a new group
pInfo->rowsTotal = 0;
if (pInfo->currentGroupOffset > 0) {
pInfo->ignoreCurrentGroup = true;
pInfo->currentGroupOffset -= 1; // now we are in the next group data
rowIndex += 1;
continue;
}
// A new group has arrived according to the result rows, and the group limitation has already reached.
// Let's jump out of current loop and return immediately.
if (pInfo->slimit.limit >= 0 && pInfo->groupTotal >= pInfo->slimit.limit) {
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return BLOCK_NO_GROUP;
} }
while ((*newgroup) == false) { pInfo->groupTotal += 1;
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
if (pBlock == NULL) { // data in current group not allowed, return if current result does not belong to the previous group.And there
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); // are results exists in current SSDataBlock
pOperator->status = OP_EXEC_DONE; if (!pInfo->multigroupResult && !samegroup && pInfo->pRes->info.rows > 0) {
return NULL; return BLOCK_NEW_GROUP;
}
} }
// now we have got the first data block of the next group. doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
if (pInfo->currentGroupOffset == 0) {
return pBlock; } else { // handle the offset in the same group
// All the data in current group needs to be discarded, due to the limit parameter in the SQL statement
if (pInfo->ignoreCurrentGroup) {
rowIndex += 1;
continue;
} }
doHandleDataInCurrentGroup(pInfo, pBlock, rowIndex);
} }
return NULL; rowIndex += 1;
}
return BLOCK_SAME_GROUP;
} }
SSDataBlock* doSLimit(void* param, bool* newgroup) { SSDataBlock* doSLimit(void* param, bool* newgroup) {
...@@ -1093,63 +1105,41 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) { ...@@ -1093,63 +1105,41 @@ SSDataBlock* doSLimit(void* param, bool* newgroup) {
} }
SSLimitOperatorInfo *pInfo = pOperator->info; SSLimitOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0;
SSDataBlock *pBlock = NULL; if (pInfo->pPrevBlock != NULL) {
while (1) { ensureOutputBuf(pInfo, pInfo->pRes, pInfo->pPrevBlock->info.rows);
pBlock = skipGroupBlock(pOperator, newgroup); int32_t ret = doSlimitImpl(pOperator, pInfo, pInfo->pPrevBlock);
if (pBlock == NULL) { assert(ret != BLOCK_NEW_GROUP);
setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
pOperator->status = OP_EXEC_DONE;
return NULL;
}
if (*newgroup) { // a new group arrives
pInfo->groupTotal += 1;
pInfo->rowsTotal = 0;
pInfo->currentOffset = pInfo->limit.offset;
}
assert(pInfo->currentGroupOffset == 0); pInfo->pPrevBlock = NULL;
}
if (pInfo->currentOffset >= pBlock->info.rows) {
pInfo->currentOffset -= pBlock->info.rows;
} else {
if (pInfo->currentOffset == 0) {
break;
}
int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset);
pBlock->info.rows = remain;
// move the remain rows of this data block to the front. assert(pInfo->currentGroupOffset >= 0);
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
int16_t bytes = pColInfoData->info.bytes; while(1) {
memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
} SSDataBlock *pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
pInfo->currentOffset = 0; if (pBlock == NULL) {
break; return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
} }
}
if (pInfo->slimit.limit > 0 && pInfo->groupTotal > pInfo->slimit.limit) { // reach the group limit, abort
return NULL;
}
if (pInfo->limit.limit > 0 && (pInfo->rowsTotal + pBlock->info.rows >= pInfo->limit.limit)) { ensureOutputBuf(pInfo, pInfo->pRes, pBlock->info.rows);
pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->rowsTotal); int32_t ret = doSlimitImpl(pOperator, pInfo, pBlock);
pInfo->rowsTotal = pInfo->limit.limit; if (ret == BLOCK_NEW_GROUP) {
pInfo->pPrevBlock = pBlock;
return pInfo->pRes;
}
if (pInfo->slimit.limit > 0 && pInfo->groupTotal >= pInfo->slimit.limit) { if (pOperator->status == OP_EXEC_DONE) {
pOperator->status = OP_EXEC_DONE; return pInfo->pRes->info.rows == 0 ? NULL : pInfo->pRes;
} }
// setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED); // now the number of rows in current group is enough, let's return to the invoke function
} else { if (pInfo->pRes->info.rows > pInfo->threshold) {
pInfo->rowsTotal += pBlock->info.rows; return pInfo->pRes;
}
} }
return pBlock;
} }
...@@ -925,7 +925,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) { ...@@ -925,7 +925,6 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
pQueryInfo = pCmd->active; pQueryInfo = pCmd->active;
pQueryInfo->pUdfInfo = pUdfInfo; pQueryInfo->pUdfInfo = pUdfInfo;
pQueryInfo->udfCopy = true; pQueryInfo->udfCopy = true;
} }
} }
...@@ -6916,7 +6915,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo ...@@ -6916,7 +6915,6 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
const char* msg1 = "interval not allowed in group by normal column"; const char* msg1 = "interval not allowed in group by normal column";
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0); STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
SSchema* tagSchema = NULL; SSchema* tagSchema = NULL;
...@@ -8438,6 +8436,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS ...@@ -8438,6 +8436,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
if (taosArrayGetSize(subInfo->pSubquery) >= 2) { if (taosArrayGetSize(subInfo->pSubquery) >= 2) {
return invalidOperationMsg(msgBuf, "not support union in subquery"); return invalidOperationMsg(msgBuf, "not support union in subquery");
} }
SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo)); SQueryInfo* pSub = calloc(1, sizeof(SQueryInfo));
tscInitQueryInfo(pSub); tscInitQueryInfo(pSub);
...@@ -8460,6 +8459,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS ...@@ -8460,6 +8459,7 @@ static int32_t doValidateSubquery(SSqlNode* pSqlNode, int32_t index, SSqlObj* pS
if (pTableMetaInfo1 == NULL) { if (pTableMetaInfo1 == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
pTableMetaInfo1->pTableMeta = extractTempTableMetaFromSubquery(pSub); pTableMetaInfo1->pTableMeta = extractTempTableMetaFromSubquery(pSub);
pTableMetaInfo1->tableMetaCapacity = tscGetTableMetaSize(pTableMetaInfo1->pTableMeta); pTableMetaInfo1->tableMetaCapacity = tscGetTableMetaSize(pTableMetaInfo1->pTableMeta);
...@@ -8543,7 +8543,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8543,7 +8543,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
// check if there is 3 level select // check if there is 3 level select
SRelElementPair* subInfo = taosArrayGet(pSqlNode->from->list, i); SRelElementPair* subInfo = taosArrayGet(pSqlNode->from->list, i);
SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0); SSqlNode* p = taosArrayGetP(subInfo->pSubquery, 0);
if (p->from->type == SQL_NODE_FROM_SUBQUERY){ if (p->from->type == SQL_NODE_FROM_SUBQUERY) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9); return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
} }
...@@ -8635,6 +8635,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf ...@@ -8635,6 +8635,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
} }
} }
// disable group result mixed up if interval/session window query exists.
if (isTimeWindowQuery(pQueryInfo)) {
size_t num = taosArrayGetSize(pQueryInfo->pUpstream);
for(int32_t i = 0; i < num; ++i) {
SQueryInfo* pUp = taosArrayGetP(pQueryInfo->pUpstream, i);
pUp->multigroupResult = false;
}
}
// parse the having clause in the first place // parse the having clause in the first place
int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1); int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1);
if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) != if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) !=
......
...@@ -3078,6 +3078,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) { ...@@ -3078,6 +3078,7 @@ void tscInitQueryInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->slimit.offset = 0; pQueryInfo->slimit.offset = 0;
pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES); pQueryInfo->pUpstream = taosArrayInit(4, POINTER_BYTES);
pQueryInfo->window = TSWINDOW_INITIALIZER; pQueryInfo->window = TSWINDOW_INITIALIZER;
pQueryInfo->multigroupResult = true;
} }
int32_t tscAddQueryInfo(SSqlCmd* pCmd) { int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
...@@ -3089,7 +3090,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) { ...@@ -3089,7 +3090,6 @@ int32_t tscAddQueryInfo(SSqlCmd* pCmd) {
} }
tscInitQueryInfo(pQueryInfo); tscInitQueryInfo(pQueryInfo);
pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer pQueryInfo->msg = pCmd->payload; // pointer to the parent error message buffer
if (pCmd->pQueryInfo == NULL) { if (pCmd->pQueryInfo == NULL) {
...@@ -3171,6 +3171,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { ...@@ -3171,6 +3171,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
pQueryInfo->window = pSrc->window; pQueryInfo->window = pSrc->window;
pQueryInfo->sessionWindow = pSrc->sessionWindow; pQueryInfo->sessionWindow = pSrc->sessionWindow;
pQueryInfo->pTableMetaInfo = NULL; pQueryInfo->pTableMetaInfo = NULL;
pQueryInfo->multigroupResult = pSrc->multigroupResult;
pQueryInfo->bufLen = pSrc->bufLen; pQueryInfo->bufLen = pSrc->bufLen;
pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery; pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery;
...@@ -3570,7 +3571,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3570,7 +3571,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->bufLen = pQueryInfo->bufLen; pNewQueryInfo->bufLen = pQueryInfo->bufLen;
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
pNewQueryInfo->multigroupResult = pQueryInfo->multigroupResult;
pNewQueryInfo->distinct = pQueryInfo->distinct; pNewQueryInfo->distinct = pQueryInfo->distinct;
if (pNewQueryInfo->buf == NULL) { if (pNewQueryInfo->buf == NULL) {
...@@ -3795,12 +3796,12 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3795,12 +3796,12 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) { if (code && !((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry)) {
pParentSql->res.code = code; pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql); tscAsyncResultOnError(pParentSql);
return; return;
} }
tscFreeSubobj(pParentSql); tscFreeSubobj(pParentSql);
tfree(pParentSql->pSubs); tfree(pParentSql->pSubs);
pParentSql->res.code = TSDB_CODE_SUCCESS; pParentSql->res.code = TSDB_CODE_SUCCESS;
...@@ -3809,9 +3810,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) { ...@@ -3809,9 +3810,9 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self, tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry); tstrerror(code), pParentSql->retry);
tscResetSqlCmd(&pParentSql->cmd, true); tscResetSqlCmd(&pParentSql->cmd, true);
code = tsParseSql(pParentSql, true); code = tsParseSql(pParentSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
return; return;
...@@ -3880,7 +3881,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) { ...@@ -3880,7 +3881,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pNew->maxRetry = pSql->maxRetry; pNew->maxRetry = pSql->maxRetry;
pNew->cmd.resColumnId = TSDB_RES_COL_ID; pNew->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pNew->rspSem, 0, 0); tsem_init(&pNew->rspSem, 0, 0);
SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id SRetrieveSupport* ps = calloc(1, sizeof(SRetrieveSupport)); // todo use object id
...@@ -4416,7 +4417,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, ...@@ -4416,7 +4417,7 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
size_t sz = 0; size_t sz = 0;
STableMeta* pChild = *ppChild; STableMeta* pChild = *ppChild;
STableMeta* pChild1; STableMeta* pChild1;
taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz); taosHashGetCloneExt(tscTableMetaMap, pChild->sTableName, strnlen(pChild->sTableName, TSDB_TABLE_FNAME_LEN), NULL, (void **)&p, &sz);
// tableMeta exists, build child table meta according to the super table meta // tableMeta exists, build child table meta according to the super table meta
...@@ -4427,9 +4428,9 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name, ...@@ -4427,9 +4428,9 @@ int32_t tscCreateTableMetaFromSTableMeta(STableMeta** ppChild, const char* name,
int32_t tableMetaSize = sizeof(STableMeta) + totalBytes; int32_t tableMetaSize = sizeof(STableMeta) + totalBytes;
if (*tableMetaCapacity < tableMetaSize) { if (*tableMetaCapacity < tableMetaSize) {
pChild1 = realloc(pChild, tableMetaSize); pChild1 = realloc(pChild, tableMetaSize);
if(pChild1 == NULL) if(pChild1 == NULL)
return -1; return -1;
pChild = pChild1; pChild = pChild1;
*tableMetaCapacity = (size_t)tableMetaSize; *tableMetaCapacity = (size_t)tableMetaSize;
} }
...@@ -4701,6 +4702,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4701,6 +4702,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->distinct = pQueryInfo->distinct; pQueryAttr->distinct = pQueryInfo->distinct;
pQueryAttr->sw = pQueryInfo->sessionWindow; pQueryAttr->sw = pQueryInfo->sessionWindow;
pQueryAttr->stateWindow = pQueryInfo->stateWindow; pQueryAttr->stateWindow = pQueryInfo->stateWindow;
pQueryAttr->multigroupResult = pQueryInfo->multigroupResult;
pQueryAttr->numOfCols = numOfCols; pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput; pQueryAttr->numOfOutput = numOfOutput;
......
...@@ -219,6 +219,7 @@ typedef struct SQueryAttr { ...@@ -219,6 +219,7 @@ typedef struct SQueryAttr {
bool distinct; // distinct query or not bool distinct; // distinct query or not
bool stateWindow; // window State on sub/normal table bool stateWindow; // window State on sub/normal table
bool createFilterOperator; // if filter operator is needed bool createFilterOperator; // if filter operator is needed
bool multigroupResult; // multigroup result can exist in one SSDataBlock
int32_t interBufSize; // intermediate buffer sizse int32_t interBufSize; // intermediate buffer sizse
int32_t havingNum; // having expr number int32_t havingNum; // having expr number
...@@ -460,16 +461,23 @@ typedef struct SLimitOperatorInfo { ...@@ -460,16 +461,23 @@ typedef struct SLimitOperatorInfo {
} SLimitOperatorInfo; } SLimitOperatorInfo;
typedef struct SSLimitOperatorInfo { typedef struct SSLimitOperatorInfo {
int64_t groupTotal; int64_t groupTotal;
int64_t currentGroupOffset; int64_t currentGroupOffset;
int64_t rowsTotal; int64_t rowsTotal;
int64_t currentOffset; int64_t currentOffset;
SLimitVal limit; SLimitVal limit;
SLimitVal slimit; SLimitVal slimit;
char **prevRow; char **prevRow;
SArray *orderColumnList; SArray *orderColumnList;
bool hasPrev;
bool ignoreCurrentGroup;
bool multigroupResult;
SSDataBlock *pRes; // result buffer
SSDataBlock *pPrevBlock;
int64_t capacity;
int64_t threshold;
} SSLimitOperatorInfo; } SSLimitOperatorInfo;
typedef struct SFilterOperatorInfo { typedef struct SFilterOperatorInfo {
...@@ -481,8 +489,9 @@ typedef struct SFillOperatorInfo { ...@@ -481,8 +489,9 @@ typedef struct SFillOperatorInfo {
SFillInfo *pFillInfo; SFillInfo *pFillInfo;
SSDataBlock *pRes; SSDataBlock *pRes;
int64_t totalInputRows; int64_t totalInputRows;
void **p;
SSDataBlock *existNewGroupBlock; SSDataBlock *existNewGroupBlock;
bool multigroupResult;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
...@@ -544,9 +553,9 @@ typedef struct SMultiwayMergeInfo { ...@@ -544,9 +553,9 @@ typedef struct SMultiwayMergeInfo {
bool hasDataBlockForNewGroup; bool hasDataBlockForNewGroup;
SSDataBlock *pExistBlock; SSDataBlock *pExistBlock;
bool hasPrev;
bool groupMix;
SArray *udfInfo; SArray *udfInfo;
bool hasPrev;
bool multiGroupResults;
} SMultiwayMergeInfo; } SMultiwayMergeInfo;
// todo support the disk-based sort // todo support the disk-based sort
...@@ -568,7 +577,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI ...@@ -568,7 +577,7 @@ SOperatorInfo* createLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorI
SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult);
SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
...@@ -577,10 +586,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf ...@@ -577,10 +586,10 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t numOfRows, void* merger, bool groupMix); int32_t numOfRows, void* merger);
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo); SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp);
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger); SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult);
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
......
...@@ -148,6 +148,7 @@ typedef struct SQueryInfo { ...@@ -148,6 +148,7 @@ typedef struct SQueryInfo {
bool orderProjectQuery; bool orderProjectQuery;
bool stateWindow; bool stateWindow;
bool globalMerge; bool globalMerge;
bool multigroupResult;
} SQueryInfo; } SQueryInfo;
/** /**
......
...@@ -39,15 +39,12 @@ ...@@ -39,15 +39,12 @@
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey)) #define TSWINDOW_IS_EQUAL(t1, t2) (((t1).skey == (t2).skey) && ((t1).ekey == (t2).ekey))
#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) #define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC))
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0} #define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
#define MULTI_KEY_DELIM "-" #define MULTI_KEY_DELIM "-"
#define HASH_CAPACITY_LIMIT 10000000
#define TIME_WINDOW_COPY(_dst, _src) do {\ #define TIME_WINDOW_COPY(_dst, _src) do {\
(_dst).skey = (_src).skey;\ (_dst).skey = (_src).skey;\
(_dst).ekey = (_src).ekey;\ (_dst).ekey = (_src).ekey;\
...@@ -968,8 +965,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t ...@@ -968,8 +965,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
break; break;
} }
} }
return;
} }
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
...@@ -1338,7 +1333,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, ...@@ -1338,7 +1333,7 @@ void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo,
} else { } else {
pCtx[k].start.ptr = (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes; pCtx[k].start.ptr = (char *)pColInfo->pData + prevRowIndex * pColInfo->info.bytes;
} }
pCtx[k].end.ptr = (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes; pCtx[k].end.ptr = (char *)pColInfo->pData + curRowIndex * pColInfo->info.bytes;
} }
} }
...@@ -1627,7 +1622,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -1627,7 +1622,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep); doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); doApplyFunctions(pRuntimeEnv, pInfo->pCtx, ascQuery ? &win : &preWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
preWin = win; preWin = win;
int32_t prevEndPos = (forwardStep - 1) * step + startPos; int32_t prevEndPos = (forwardStep - 1) * step + startPos;
startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos); startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos);
if (startPos < 0) { if (startPos < 0) {
...@@ -2266,30 +2261,30 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf ...@@ -2266,30 +2261,30 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Fill: { case OP_Fill: {
SOperatorInfo* pInfo = pRuntimeEnv->proot; SOperatorInfo* pInfo = pRuntimeEnv->proot;
pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput); pRuntimeEnv->proot = createFillOperatorInfo(pRuntimeEnv, pInfo, pInfo->pExpr, pInfo->numOfOutput, pQueryAttr->multigroupResult);
break; break;
} }
case OP_MultiwayMergeSort: { case OP_MultiwayMergeSort: {
bool groupMix = true; pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, 4096, merger);
if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
groupMix = false;
}
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput,
4096, merger, groupMix); // TODO hack it
break; break;
} }
case OP_GlobalAggregate: { case OP_GlobalAggregate: { // If fill operator exists, the result rows of different group can not be in the same SSDataBlock.
bool multigroupResult = pQueryAttr->multigroupResult;
if (pQueryAttr->multigroupResult) {
multigroupResult = (pQueryAttr->fillType == TSDB_FILL_NONE);
}
pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, pRuntimeEnv->proot = createGlobalAggregateOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo); pQueryAttr->numOfExpr3, merger, pQueryAttr->pUdfInfo, multigroupResult);
break; break;
} }
case OP_SLimit: { case OP_SLimit: {
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3, int32_t num = pRuntimeEnv->proot->numOfOutput;
pQueryAttr->numOfExpr3, merger); SExprInfo* pExpr = pRuntimeEnv->proot->pExpr;
pRuntimeEnv->proot = createSLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pExpr, num, merger, pQueryAttr->multigroupResult);
break; break;
} }
...@@ -3648,7 +3643,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf ...@@ -3648,7 +3643,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOf
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i); SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows; pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
// re-estabilish output buffer pointer. // set the correct pointer after the memory buffer reallocated.
int32_t functionId = pBInfo->pCtx[i].functionId; int32_t functionId = pBInfo->pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) { if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
if(i>0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput; if(i>0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i-1].pOutput;
...@@ -4213,6 +4208,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti ...@@ -4213,6 +4208,7 @@ static void toSSDataBlock(SGroupResInfo *pGroupResInfo, SQueryRuntimeEnv* pRunti
// refactor : extract method // refactor : extract method
SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0); SColumnInfoData* pInfoData = taosArrayGet(pBlock->pDataBlock, 0);
//add condition (pBlock->info.rows >= 1) just to runtime happy //add condition (pBlock->info.rows >= 1) just to runtime happy
if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) { if (pInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP && pBlock->info.rows >= 1) {
STimeWindow* w = &pBlock->info.window; STimeWindow* w = &pBlock->info.window;
...@@ -4292,15 +4288,15 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data ...@@ -4292,15 +4288,15 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
} }
} }
int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity) { int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutput, int32_t capacity, void** p) {
void** p = calloc(pFillInfo->numOfCols, POINTER_BYTES);
for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for(int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pOutput->pDataBlock, i);
p[i] = pColInfoData->pData; p[i] = pColInfoData->pData + (pColInfoData->info.bytes * pOutput->info.rows);
} }
pOutput->info.rows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity); int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, p, capacity - pOutput->info.rows);
tfree(p); pOutput->info.rows += numOfRows;
return pOutput->info.rows; return pOutput->info.rows;
} }
...@@ -5344,11 +5340,12 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -5344,11 +5340,12 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) { static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param; SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
taosArrayDestroy(pInfo->orderColumnList); taosArrayDestroy(pInfo->orderColumnList);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
tfree(pInfo->prevRow); tfree(pInfo->prevRow);
} }
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream,
SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo) { SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
pInfo->resultRowFactor = pInfo->resultRowFactor =
...@@ -5356,15 +5353,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5356,15 +5353,14 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx pRuntimeEnv->scanFlag = MERGE_STAGE; // TODO init when creating pCtx
pInfo->pMerge = param; pInfo->multiGroupResults = groupResultMixedUp;
pInfo->bufCapacity = 4096; pInfo->pMerge = param;
pInfo->udfInfo = pUdfInfo; pInfo->bufCapacity = 4096;
pInfo->udfInfo = pUdfInfo;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pInfo->bufCapacity * pInfo->resultRowFactor);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->orderColumnList = getOrderCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->groupColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
// TODO refactor // TODO refactor
int32_t len = 0; int32_t len = 0;
...@@ -5417,17 +5413,15 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, ...@@ -5417,17 +5413,15 @@ SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv,
} }
SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput, SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SExprInfo *pExpr, int32_t numOfOutput,
int32_t numOfRows, void *merger, bool groupMix) { int32_t numOfRows, void *merger) {
SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo)); SMultiwayMergeInfo* pInfo = calloc(1, sizeof(SMultiwayMergeInfo));
pInfo->pMerge = merger; pInfo->pMerge = merger;
pInfo->groupMix = groupMix; pInfo->bufCapacity = numOfRows;
pInfo->bufCapacity = numOfRows;
pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr); pInfo->orderColumnList = getResultGroupCheckColumns(pRuntimeEnv->pQueryAttr);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows); pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, numOfRows);
{ { // todo extract method to create prev compare buffer
int32_t len = 0; int32_t len = 0;
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
len += pExpr[i].base.colBytes; len += pExpr[i].base.colBytes;
...@@ -5435,8 +5429,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx ...@@ -5435,8 +5429,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfCols;
int32_t offset = POINTER_BYTES * numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
...@@ -5452,7 +5446,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx ...@@ -5452,7 +5446,8 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
pOperator->status = OP_IN_EXECUTING; pOperator->status = OP_IN_EXECUTING;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv; pOperator->pRuntimeEnv = pRuntimeEnv;
pOperator->numOfOutput = pRuntimeEnv->pQueryAttr->numOfCols; pOperator->numOfOutput = numOfOutput;
pOperator->pExpr = pExpr;
pOperator->exec = doMultiwayMergeSort; pOperator->exec = doMultiwayMergeSort;
pOperator->cleanup = destroyGlobalAggOperatorInfo; pOperator->cleanup = destroyGlobalAggOperatorInfo;
return pOperator; return pOperator;
...@@ -6362,19 +6357,13 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { ...@@ -6362,19 +6357,13 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) {
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
static SSDataBlock* doFill(void* param, bool* newgroup) { static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo *pInfo, SQueryRuntimeEnv *pRuntimeEnv, bool *newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SFillOperatorInfo *pInfo = pOperator->info;
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
if (taosFillHasMoreResults(pInfo->pFillInfo)) { if (taosFillHasMoreResults(pInfo->pFillInfo)) {
*newgroup = false; *newgroup = false;
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity, pInfo->p);
return pInfo->pRes; if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult)) {
return;
}
} }
// handle the cached new group data block // handle the cached new group data block
...@@ -6386,12 +6375,48 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6386,12 +6375,48 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
pInfo->existNewGroupBlock = NULL; pInfo->existNewGroupBlock = NULL;
*newgroup = true; *newgroup = true;
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; }
}
static SSDataBlock* doFill(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
SFillOperatorInfo *pInfo = pOperator->info;
pInfo->pRes->info.rows = 0;
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
} }
SQueryRuntimeEnv *pRuntimeEnv = pOperator->pRuntimeEnv;
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) {
return pInfo->pRes;
}
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
// *newgroup = false;
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
// return pInfo->pRes;
// }
//
// // handle the cached new group data block
// if (pInfo->existNewGroupBlock) {
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
//
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
//
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
// pInfo->existNewGroupBlock = NULL;
// *newgroup = true;
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
// }
while(1) { while(1) {
publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup); SSDataBlock* pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
...@@ -6405,8 +6430,8 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6405,8 +6430,8 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
pInfo->existNewGroupBlock = pBlock; pInfo->existNewGroupBlock = pBlock;
*newgroup = false; *newgroup = false;
// fill the previous group data block // Fill the previous group data block, before handle the data block of new group.
// before handle a new data block, close the fill operation for previous group data block // Close the fill operation for previous group data block
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
} else { } else {
if (pBlock == NULL) { if (pBlock == NULL) {
...@@ -6418,28 +6443,61 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6418,28 +6443,61 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey); taosFillSetStartInfo(pInfo->pFillInfo, 0, pRuntimeEnv->pQueryAttr->window.ekey);
} else { } else {
pInfo->totalInputRows += pBlock->info.rows; pInfo->totalInputRows += pBlock->info.rows;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey);
int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey
: */pBlock->info.window.ekey;
taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock);
} }
} }
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
if (pInfo->pRes->info.rows > 0) { // current group has no more result to return
return pInfo->pRes; // current group has no more result to return
if (pInfo->pRes->info.rows > 0) {
// 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
return pInfo->pRes;
}
doHandleRemainBlockFromNewGroup(pInfo, pRuntimeEnv, newgroup);
if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold || pBlock == NULL) {
return pInfo->pRes;
}
// if (taosFillHasMoreResults(pInfo->pFillInfo)) {
// *newgroup = false;
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pRuntimeEnv->resultInfo.capacity);
// return pInfo->pRes;
// }
//
// // handle the cached new group data block
// if (pInfo->existNewGroupBlock) {
// pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
// int64_t ekey = Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED)?pRuntimeEnv->pQueryAttr->window.ekey:pInfo->existNewGroupBlock->info.window.ekey;
// taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
//
// taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
// taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
//
// doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity);
// pInfo->existNewGroupBlock = NULL;
// *newgroup = true;
//
// if (pInfo->pRes->info.rows > pRuntimeEnv->resultInfo.threshold) {
// return pInfo->pRes;
// }
//
//// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
// }
} else if (pInfo->existNewGroupBlock) { // try next group } else if (pInfo->existNewGroupBlock) { // try next group
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = /*Q_STATUS_EQUAL(pRuntimeEnv->status, QUERY_COMPLETED) ? pRuntimeEnv->pQueryAttr->window.ekey int64_t ekey = pInfo->existNewGroupBlock->info.window.ekey;
:*/ pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start); taosResetFillInfo(pInfo->pFillInfo, pInfo->pFillInfo->start);
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock);
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pRuntimeEnv->resultInfo.capacity, pInfo->p);
pInfo->existNewGroupBlock = NULL; pInfo->existNewGroupBlock = NULL;
*newgroup = true; *newgroup = true;
...@@ -6447,7 +6505,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) { ...@@ -6447,7 +6505,6 @@ static SSDataBlock* doFill(void* param, bool* newgroup) {
} else { } else {
return NULL; return NULL;
} }
// return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
} }
} }
...@@ -6548,6 +6605,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) { ...@@ -6548,6 +6605,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param; SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo); pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes); pInfo->pRes = destroyOutputBuf(pInfo->pRes);
tfree(pInfo->p);
} }
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) { static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput) {
...@@ -6891,10 +6949,10 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato ...@@ -6891,10 +6949,10 @@ SOperatorInfo* createGroupbyOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
return pOperator; return pOperator;
} }
SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult) {
int32_t numOfOutput) {
SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo)); SFillOperatorInfo* pInfo = calloc(1, sizeof(SFillOperatorInfo));
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
pInfo->multigroupResult = multigroupResult;
{ {
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6909,6 +6967,8 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -6909,6 +6967,8 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput, taosCreateFillInfo(pQueryAttr->order.order, w.skey, 0, (int32_t)pRuntimeEnv->resultInfo.capacity, numOfOutput,
pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit, pQueryAttr->interval.sliding, pQueryAttr->interval.slidingUnit,
(int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo); (int8_t)pQueryAttr->precision, pQueryAttr->fillType, pColInfo, pRuntimeEnv->qinfo);
pInfo->p = calloc(pInfo->pFillInfo->numOfCols, POINTER_BYTES);
} }
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
...@@ -6928,7 +6988,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn ...@@ -6928,7 +6988,7 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
return pOperator; return pOperator;
} }
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger) { SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* pMerger, bool multigroupResult) {
SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo)); SSLimitOperatorInfo* pInfo = calloc(1, sizeof(SSLimitOperatorInfo));
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
...@@ -6936,9 +6996,11 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6936,9 +6996,11 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr); pInfo->orderColumnList = getResultGroupCheckColumns(pQueryAttr);
pInfo->slimit = pQueryAttr->slimit; pInfo->slimit = pQueryAttr->slimit;
pInfo->limit = pQueryAttr->limit; pInfo->limit = pQueryAttr->limit;
pInfo->capacity = pRuntimeEnv->resultInfo.capacity;
pInfo->threshold = (int64_t) (pInfo->capacity * 0.8);
pInfo->currentOffset = pQueryAttr->limit.offset;
pInfo->currentGroupOffset = pQueryAttr->slimit.offset; pInfo->currentGroupOffset = pQueryAttr->slimit.offset;
pInfo->currentOffset = pQueryAttr->limit.offset; pInfo->multigroupResult= multigroupResult;
// TODO refactor // TODO refactor
int32_t len = 0; int32_t len = 0;
...@@ -6946,10 +7008,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6946,10 +7008,10 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
len += pExpr[i].base.resBytes; len += pExpr[i].base.resBytes;
} }
int32_t numOfCols = pInfo->orderColumnList != NULL? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0; int32_t numOfCols = (pInfo->orderColumnList != NULL)? (int32_t) taosArrayGetSize(pInfo->orderColumnList):0;
pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len)); pInfo->prevRow = calloc(1, (POINTER_BYTES * numOfCols + len));
int32_t offset = POINTER_BYTES * numOfCols;
int32_t offset = POINTER_BYTES * numOfCols;
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
pInfo->prevRow[i] = (char*)pInfo->prevRow + offset; pInfo->prevRow[i] = (char*)pInfo->prevRow + offset;
...@@ -6957,6 +7019,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator ...@@ -6957,6 +7019,8 @@ SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
offset += pExpr[index->colIndex].base.resBytes; offset += pExpr[index->colIndex].base.resBytes;
} }
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SLimitOperator"; pOperator->name = "SLimitOperator";
...@@ -7195,8 +7259,8 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -7195,8 +7259,8 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
break; break;
} }
// ensure result output buf // ensure result output buf
if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) { if (pRes->info.rows + pBlock->info.rows > pInfo->outputCapacity) {
int32_t newSize = pRes->info.rows + pBlock->info.rows; int32_t newSize = pRes->info.rows + pBlock->info.rows;
for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) { for (int i = 0; i < taosArrayGetSize(pRes->pDataBlock); i++) {
...@@ -7219,7 +7283,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -7219,7 +7283,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) { for (int j = 0; j < taosArrayGetSize(pRes->pDataBlock); j++) {
SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info SDistinctDataInfo* pDistDataInfo = taosArrayGet(pInfo->pDistinctDataInfo, j); // distinct meta info
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pDistDataInfo->index); //src
SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist SColumnInfoData* pResultColInfoData = taosArrayGet(pRes->pDataBlock, j); // dist
char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i; char* val = ((char*)pColInfoData->pData) + pDistDataInfo->bytes * i;
char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows; char *start = pResultColInfoData->pData + pDistDataInfo->bytes * pInfo->pRes->info.rows;
memcpy(start, val, pDistDataInfo->bytes); memcpy(start, val, pDistDataInfo->bytes);
...@@ -7237,7 +7301,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) { ...@@ -7237,7 +7301,7 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo)); SDistinctOperatorInfo* pInfo = calloc(1, sizeof(SDistinctOperatorInfo));
pInfo->totalBytes = 0; pInfo->totalBytes = 0;
pInfo->buf = NULL; pInfo->buf = NULL;
pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold pInfo->threshold = tsMaxNumOfDistinctResults; // distinct result threshold
......
...@@ -430,7 +430,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) ...@@ -430,7 +430,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i); SColumnInfoData* pColData = taosArrayGet(pInput->pDataBlock, i);
pFillInfo->pData[i] = pColData->pData; pFillInfo->pData[i] = pColData->pData;
if (TSDB_COL_IS_TAG(pCol->flag)/* || IS_VAR_DATA_TYPE(pCol->col.type)*/) { // copy the tag value to tag value buffer if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
assert (pTag->col.colId == pCol->col.colId); assert (pTag->col.colId == pCol->col.colId);
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy?? memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册