提交 4527447b 编写于 作者: H Haojun Liao

fix(query): handle the indefinit function output with partition by

上级 ac8e9d10
......@@ -323,6 +323,9 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) {
if ((asc && (win.ekey < pReader->window.skey)) || ((!asc) && (win.skey > pReader->window.ekey))) {
pIter->index += step;
if ((asc && pIter->index >= pIter->numOfFiles) || ((!asc) && pIter->index < 0)) {
return false;
}
continue;
}
......
......@@ -529,6 +529,9 @@ typedef struct SIndefOperatorInfo {
SArray* pPseudoColInfo;
SExprSupp scalarSup;
SNode* pCondition;
uint64_t groupId;
SSDataBlock* pNextGroupRes;
} SIndefOperatorInfo;
typedef struct SFillOperatorInfo {
......
......@@ -571,8 +571,8 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
pResult->info.groupId = pSrcBlock->info.groupId;
// if the source equals to the destination, it is to create a new column as the result of scalar function or some
// operators.
// if the source equals to the destination, it is to create a new column as the result of scalar
// function or some operators.
bool createNewColModel = (pResult == pSrcBlock);
int32_t numOfRows = 0;
......@@ -580,17 +580,17 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
for (int32_t k = 0; k < numOfOutput; ++k) {
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
SqlFunctionCtx* pfCtx = &pCtx[k];
SInputColumnInfoData* pInputData = &pfCtx->input;
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
if (pResult->info.rows > 0 && !createNewColModel) {
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pfCtx->input.pData[0],
pfCtx->input.numOfRows);
colDataMergeCol(pColInfoData, pResult->info.rows, &pResult->info.capacity, pInputData->pData[0], pInputData->numOfRows);
} else {
colDataAssign(pColInfoData, pfCtx->input.pData[0], pfCtx->input.numOfRows, &pResult->info);
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
}
numOfRows = pfCtx->input.numOfRows;
numOfRows = pInputData->numOfRows;
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
......@@ -623,14 +623,12 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
numOfRows = dest.numOfRows;
taosArrayDestroy(pBlockList);
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
ASSERT(!fmIsAggFunc(pfCtx->functionId));
// _rowts/_c0, not tbname column
if (fmIsPseudoColumnFunc(pfCtx->functionId) && (!fmIsScanPseudoColumnFunc(pfCtx->functionId))) {
// do nothing
} else if (fmIsIndefiniteRowsFunc(pfCtx->functionId)) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[k]);
pfCtx->fpSet.init(&pCtx[k], pResInfo);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pfCtx);
pfCtx->fpSet.init(pfCtx, pResInfo);
pfCtx->pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
pfCtx->offset = createNewColModel ? 0 : pResult->info.rows; // set the start offset
......@@ -642,6 +640,23 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
}
numOfRows = pfCtx->fpSet.process(pfCtx);
} else if (fmIsAggFunc(fmIsAggFunc(pfCtx->functionId))) {
// _group_key function for "partition by tbname" + csum(col_name) query
SColumnInfoData* pOutput = taosArrayGet(pResult->pDataBlock, outputSlotId);
int32_t slotId = pfCtx->param[0].pCol->slotId;
// todo handle the json tag
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
for(int32_t f = 0; f < pSrcBlock->info.rows; ++f) {
bool isNull = colDataIsNull_s(pInput, f);
if (isNull) {
colDataAppendNULL(pOutput, pResult->info.rows + f);
} else {
char* data = colDataGetData(pInput, f);
colDataAppend(pOutput, pResult->info.rows + f, data, isNull);
}
}
} else {
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pBlockList, &pSrcBlock);
......@@ -675,25 +690,6 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
return TSDB_CODE_SUCCESS;
}
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
if (IS_VAR_DATA_TYPE(type)) {
// todo disable this
// if (pResultRow->key == NULL) {
// pResultRow->key = taosMemoryMalloc(varDataTLen(pData));
// varDataCopy(pResultRow->key, pData);
// } else {
// ASSERT(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
// }
} else {
int64_t v = -1;
GET_TYPED_DATA(v, int64_t, type, pData);
pResultRow->win.skey = v;
pResultRow->win.ekey = v;
}
}
bool functionNeedToExecute(SqlFunctionCtx* pCtx) {
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
......@@ -3825,6 +3821,40 @@ _error:
return NULL;
}
static void doHandleDataBlock(SOperatorInfo* pOperator, SSDataBlock* pBlock, SOperatorInfo* downstream, SExecTaskInfo* pTaskInfo) {
int32_t order = 0;
int32_t scanFlag = 0;
SIndefOperatorInfo* pIndefInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
SExprSupp* pSup = &pOperator->exprSupp;
// the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(downstream, &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
// there is an scalar expression that needs to be calculated before apply the group aggregation.
SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
if (pScalarSup->pExprInfo != NULL) {
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
}
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs,
pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
}
}
static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
SIndefOperatorInfo* pIndefInfo = pOperator->info;
SOptrBasicInfo* pInfo = &pIndefInfo->binfo;
......@@ -3839,8 +3869,6 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
}
int64_t st = 0;
int32_t order = 0;
int32_t scanFlag = 0;
if (pOperator->cost.openCost == 0) {
st = taosGetTimestampUs();
......@@ -3848,42 +3876,54 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
while(1) {
// here we need to handle the existsed group results
if (pIndefInfo->pNextGroupRes != NULL) { // todo extract method
for (int32_t k = 0; k < pSup->numOfExprs; ++k) {
SqlFunctionCtx* pCtx = &pSup->pCtx[k];
// the pDataBlock are always the same one, no need to call this again
int32_t code = getTableScanInfo(pOperator->pDownstream[0], &order, &scanFlag);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
pResInfo->initialized = false;
pCtx->pOutput = NULL;
}
doHandleDataBlock(pOperator, pIndefInfo->pNextGroupRes, downstream, pTaskInfo);
pIndefInfo->pNextGroupRes = NULL;
}
// there is an scalar expression that needs to be calculated before apply the group aggregation.
SExprSupp* pScalarSup = &pIndefInfo->scalarSup;
if (pScalarSup->pExprInfo != NULL) {
code = projectApplyFunctions(pScalarSup->pExprInfo, pBlock, pBlock, pScalarSup->pCtx, pScalarSup->numOfExprs,
pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) {
while (1) {
// The downstream exec may change the value of the newgroup, so use a local variable instead.
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
if (pBlock == NULL) {
doSetOperatorCompleted(pOperator);
break;
}
if (pIndefInfo->groupId == 0 && pBlock->info.groupId != 0) {
pIndefInfo->groupId = pBlock->info.groupId; // this is the initial group result
} else {
if (pIndefInfo->groupId != pBlock->info.groupId) { // reset output buffer and computing status
pIndefInfo->groupId = pBlock->info.groupId;
pIndefInfo->pNextGroupRes = pBlock;
break;
}
}
doHandleDataBlock(pOperator, pBlock, downstream, pTaskInfo);
if (pInfo->pRes->info.rows >= pOperator->resultInfo.threshold) {
break;
}
}
}
setInputDataBlock(pOperator, pSup->pCtx, pBlock, order, scanFlag, false);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
code = projectApplyFunctions(pOperator->exprSupp.pExprInfo, pInfo->pRes, pBlock, pSup->pCtx,
pOperator->exprSupp.numOfExprs, pIndefInfo->pPseudoColInfo);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, code);
doFilter(pIndefInfo->pCondition, pInfo->pRes);
size_t rows = pInfo->pRes->info.rows;
if (rows >= 0) {
break;
}
}
doFilter(pIndefInfo->pCondition, pInfo->pRes);
size_t rows = pInfo->pRes->info.rows;
pOperator->resultInfo.totalRows += rows;
......@@ -3928,24 +3968,23 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
if (numOfRows * pResBlock->info.rowSize > TWOMB) {
numOfRows = TWOMB / pResBlock->info.rowSize;
}
initResultSizeInfo(pOperator, numOfRows);
initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str);
initBasicInfo(&pInfo->binfo, pResBlock);
setFunctionResultOutput(pOperator, &pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, numOfExpr);
pInfo->binfo.pRes = pResBlock;
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pInfo->pCondition = pPhyNode->node.pConditions;
pInfo->binfo.pRes = pResBlock;
pInfo->pCondition = pPhyNode->node.pConditions;
pInfo->pPseudoColInfo= setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->exprSupp.numOfExprs = numOfExpr;
pOperator->name = "IndefinitOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC;
pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo;
pOperator->pTaskInfo = pTaskInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册