提交 f54f210e 编写于 作者: H Haojun Liao

[td-2895] refactor.

上级 b9867fab
......@@ -3487,9 +3487,6 @@ static void diff_function(SQLFunctionCtx *pCtx) {
int32_t forwardStep = (isFirstBlock) ? notNullElems - 1 : notNullElems;
GET_RES_INFO(pCtx)->numOfRes += forwardStep;
pCtx->pOutput += forwardStep * pCtx->outputBytes;
pCtx->ptsOutputBuf = (char*)pCtx->ptsOutputBuf + forwardStep * TSDB_KEYSIZE;
}
}
......
......@@ -324,6 +324,13 @@ int64_t getNumOfResult_rv(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, i
return maxOutput;
}
static void setNumOfRes(SQLFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t j = 0; j < numOfOutput; ++j) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(&pCtx[j]);
pResInfo->numOfRes = 0;
}
}
bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) {
if (pGroupbyExpr == NULL || pGroupbyExpr->numOfGroupCols == 0) {
return false;
......@@ -3559,16 +3566,20 @@ void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx* pCtx, SR
}
void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) {
SSDataBlock* pDataBlock = pInfo->binfo.pRes;
if (pInfo->bufCapacity < pDataBlock->info.rows + numOfInputRows) {
int32_t newSize = pDataBlock->info.rows + numOfInputRows;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SSDataBlock* pDataBlock = pBInfo->pRes;
int32_t newSize = pDataBlock->info.rows + numOfInputRows;
if (pInfo->bufCapacity < newSize) {
for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
char* p = realloc(pColInfo->pData, newSize * pColInfo->info.bytes);
if (p != NULL) {
pColInfo->pData = p;
// it starts from the tail of the previously generated results.
pBInfo->pCtx[i].pOutput = pColInfo->pData;
pInfo->bufCapacity = newSize;
} else {
// longjmp
}
......@@ -3577,7 +3588,13 @@ void updateOutputBuf(SArithOperatorInfo* pInfo, int32_t numOfInputRows) {
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock->pDataBlock, i);
pInfo->binfo.pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
pBInfo->pCtx[i].pOutput = pColInfo->pData + pColInfo->info.bytes * pDataBlock->info.rows;
// re-estabilish output buffer pointer.
int32_t functionId = pBInfo->pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[0].pOutput;
}
}
}
......@@ -3697,8 +3714,6 @@ void prepareRepeatTableScan(SQueryRuntimeEnv* pRuntimeEnv) {
}
}
}
#endif
void initCtxOutputBuf_rv(SQLFunctionCtx* pCtx, int32_t size) {
......@@ -6383,12 +6398,14 @@ static SSDataBlock* doArithmeticOperation(void* param) {
updateOutputBuf(pArithInfo, pBlock->info.rows);
arithmeticApplyFunctions(pRuntimeEnv, pArithInfo->binfo.pCtx, pOperator->numOfOutput);
pArithInfo->binfo.pRes->info.rows += pBlock->info.rows;
pArithInfo->binfo.pRes->info.rows = getNumOfResult_rv(pRuntimeEnv, pArithInfo->binfo.pCtx, pOperator->numOfOutput);
if (pArithInfo->binfo.pRes->info.rows >= 4096) {
break;
}
}
setNumOfRes(pArithInfo->binfo.pCtx, pOperator->numOfOutput);
if (pArithInfo->binfo.pRes->info.rows > 0) {
return pArithInfo->binfo.pRes;
} else {
......@@ -6414,8 +6431,12 @@ static SSDataBlock* doLimit(void* param) {
if (pInfo->total + pBlock->info.rows >= pInfo->limit) {
pBlock->info.rows = (pInfo->limit - pInfo->total);
pInfo->total = pInfo->limit;
setQueryStatus(pOperator->pRuntimeEnv->pQuery, QUERY_COMPLETED);
pOperator->completed = true;
} else {
pInfo->total += pBlock->info.rows;
}
return pBlock;
......@@ -6753,8 +6774,9 @@ SOperatorInfo* createStableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
SOperatorInfo* createArithOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SArithOperatorInfo* pInfo = calloc(1, sizeof(SArithOperatorInfo));
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->bufCapacity = 4096;
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput);
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
setDefaultOutputBuf(pRuntimeEnv, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.pRes, pInfo->binfo.rowCellInfoOffset);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册