提交 a4c7e78d 编写于 作者: H Haojun Liao

[td-14393] refactor.

上级 78b1bf27
...@@ -505,7 +505,7 @@ typedef struct SProjectOperatorInfo { ...@@ -505,7 +505,7 @@ typedef struct SProjectOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SAggSupporter aggSup; SAggSupporter aggSup;
SSDataBlock *existDataBlock; SSDataBlock *existDataBlock;
int32_t threshold; SArray *pPseudoColInfo;
SLimit limit; SLimit limit;
int64_t curOffset; int64_t curOffset;
int64_t curOutput; int64_t curOutput;
......
...@@ -1244,8 +1244,21 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction ...@@ -1244,8 +1244,21 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
} }
} }
static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, SArray* pPseudoList) {
size_t num = 0;
if (pPseudoList != NULL) {
num = taosArrayGetSize(pPseudoList);
}
for (int32_t i = 0; i < num; ++i) {
pCtx[i].pOutput = taosArrayGet(pResult->pDataBlock, i);
}
}
static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput) { int32_t numOfOutput, SArray* pPseudoList) {
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
...@@ -1267,16 +1280,20 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData ...@@ -1267,16 +1280,20 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
ASSERT(!fmIsAggFunc(pCtx[k].functionId)); ASSERT(!fmIsAggFunc(pCtx[k].functionId));
if (fmIsPseudoColumnFunc(pCtx[k].functionId)) { if (fmIsPseudoColumnFunc(pCtx[k].functionId)) {
// TODO: set the correct _rowts column output buffer, there may be multiple _rowts columns // do nothing
} else if (fmIsNonstandardSQLFunc(pCtx[k].functionId)) { } else if (fmIsNonstandardSQLFunc(pCtx[k].functionId)) {
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); // todo set the correct timestamp column
pCtx[k].input.pPTS = taosArrayGet(pSrcBlock->pDataBlock, 1);
pCtx[k].ptsList = 0;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[k]); SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[k]);
pCtx[k].fpSet.init(&pCtx[k], pResInfo); pCtx[k].fpSet.init(&pCtx[k], pResInfo);
pCtx[k].pOutput = (char*)pColInfoData; pCtx[k].pOutput = taosArrayGet(pResult->pDataBlock, k);
// pCtx[k].pTsOutput = pCtx[k].offset = pResult->info.rows; // set the start offset
int32_t* outputColIndex = taosArrayGet(pPseudoList, 0);
pCtx[k].pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput;
int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]); int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]);
pResult->info.rows += numOfRows; pResult->info.rows += numOfRows;
} else { } else {
...@@ -1929,7 +1946,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num ...@@ -1929,7 +1946,7 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
pCtx->pTsOutput = NULL; pCtx->pTsOutput = NULL;//taosArrayInit(4, POINTER_BYTES);
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
pCtx->resDataInfo.type = pFunct->resSchema.type; pCtx->resDataInfo.type = pFunct->resSchema.type;
pCtx->order = TSDB_ORDER_ASC; pCtx->order = TSDB_ORDER_ASC;
...@@ -5382,12 +5399,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) ...@@ -5382,12 +5399,12 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup)
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput, pProjectInfo->pPseudoColInfo);
// todo extract method
if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) { if (pProjectInfo->curOffset < pInfo->pRes->info.rows && pProjectInfo->curOffset > 0) {
blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset); blockDataTrimFirstNRows(pInfo->pRes, pProjectInfo->curOffset);
pProjectInfo->curOffset = 0; pProjectInfo->curOffset = 0;
break;
} else if (pProjectInfo->curOffset >= pInfo->pRes->info.rows) { } else if (pProjectInfo->curOffset >= pInfo->pRes->info.rows) {
pProjectInfo->curOffset -= pInfo->pRes->info.rows; pProjectInfo->curOffset -= pInfo->pRes->info.rows;
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
...@@ -6247,6 +6264,17 @@ _error: ...@@ -6247,6 +6264,17 @@ _error:
return NULL; return NULL;
} }
static SArray* setRowTsColumnOutputInfo(SqlFunctionCtx* pCtx, int32_t numOfCols) {
SArray* pList = taosArrayInit(4, sizeof(int32_t));
for(int32_t i = 0; i < numOfCols; ++i) {
if (fmIsPseudoColumnFunc(pCtx[i].functionId)) {
taosArrayPush(pList, &i);
}
}
return pList;
}
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num,
SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { SSDataBlock* pResBlock, SLimit* pLimit, SExecTaskInfo* pTaskInfo) {
SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo)); SProjectOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SProjectOperatorInfo));
...@@ -6263,6 +6291,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p ...@@ -6263,6 +6291,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str); initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock, pTaskInfo->id.str);
setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo);
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pInfo->binfo.pCtx, numOfCols);
pOperator->name = "ProjectOperator"; pOperator->name = "ProjectOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_PROJECT;
......
...@@ -887,9 +887,9 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { ...@@ -887,9 +887,9 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1; // int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
SColumnInfoData* pTsOutput = pCtx->pTsOutput; SColumnInfoData* pTsOutput = pCtx->pTsOutput;
TSKEY* tsList = GET_TS_LIST(pCtx); TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
int32_t startOffset = 0; int32_t startOffset = pCtx->offset;
switch (pInputCol->info.type) { switch (pInputCol->info.type) {
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput; SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
...@@ -916,10 +916,10 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { ...@@ -916,10 +916,10 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
} else { } else {
colDataAppendInt32(pOutput, pos, &delta); colDataAppendInt32(pOutput, pos, &delta);
} }
}
if (tsList != NULL) { if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]); colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
} }
pDiffInfo->prev.i64 = v; pDiffInfo->prev.i64 = v;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册