提交 e28b75b8 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 a63c6198
......@@ -37,7 +37,7 @@ typedef struct SFuncExecEnv {
typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock, int32_t slotId);
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
typedef struct SScalarFuncExecFuncs {
......@@ -113,7 +113,7 @@ typedef struct SResultRowEntryInfo {
bool initialized:1; // output buffer has been initialized
bool complete:1; // query has completed
uint8_t isNullRes:6; // the result is null
uint8_t numOfRes; // num of output result in current buffer
uint8_t numOfRes; // num of output result in current buffer
} SResultRowEntryInfo;
// determine the real data need to calculated the result
......
......@@ -60,7 +60,7 @@ typedef struct SResultRow {
uint32_t numOfRows; // number of rows of current time window
struct SResultRowEntryInfo* pEntryInfo; // For each result column, there is a resultInfo
STimeWindow win;
char *key; // start key of current result row
char *key; // start key of current result row
} SResultRow;
typedef struct SResultRowPosition {
......
......@@ -608,8 +608,8 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
void doBuildResultDatablock(SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf, int32_t* rowCellOffset, SqlFunctionCtx* pCtx);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
......
......@@ -230,10 +230,10 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
int32_t orderType, int32_t* rowCellOffset);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo);
......@@ -2809,16 +2809,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
}
}
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t j = 0; j < numOfOutput; ++j) {
if (pCtx[j].functionId == -1) {
continue;
}
pCtx[j].fpSet.finalize(&pCtx[j]);
}
}
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
......@@ -2841,7 +2831,7 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
}
if (pCtx[j].fpSet.process) { // TODO set the dummy function, to avoid the check for null ptr.
pCtx[j].fpSet.finalize(&pCtx[j]);
// pCtx[j].fpSet.finalize(&pCtx[j]);
}
if (pRow->numOfRows < pResInfo->numOfRes) {
......@@ -2872,7 +2862,7 @@ void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbased
}
if (pCtx[j].fpSet.process) { // TODO set the dummy function.
pCtx[j].fpSet.finalize(&pCtx[j]);
// pCtx[j].fpSet.finalize(&pCtx[j]);
pResInfo->initialized = true;
}
......@@ -3107,7 +3097,8 @@ void setIntervalQueryRange(STableQueryInfo* pTableQueryInfo, TSKEY key, STimeWin
* @param pQInfo
* @param result
*/
int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, int32_t* rowCellOffset) {
int32_t doCopyToSDataBlock(SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx) {
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
......@@ -3125,8 +3116,6 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo*
step = -1;
}
int32_t nrows = pBlock->info.rows;
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
SResultRowPosition* pPos = taosArrayGet(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pageId);
......@@ -3139,7 +3128,7 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo*
// TODO copy multiple rows?
int32_t numOfRowsToCopy = pRow->numOfRows;
if (numOfResult + numOfRowsToCopy >= rowCapacity) {
if (numOfResult + numOfRowsToCopy >= pBlock->info.capacity) {
break;
}
......@@ -3148,31 +3137,32 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo*
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset);
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellOffset);
if (pCtx[j].fpSet.process) {
pCtx[j].fpSet.finalize(&pCtx[j], pBlock, slotId);
} else {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, slotId);
char* in = GET_ROWCELL_INTERBUF(pEntryInfo);
colDataAppend(pColInfoData, nrows, in, pEntryInfo->isNullRes);
char* in = GET_ROWCELL_INTERBUF(pCtx[j].resultInfo);
colDataAppend(pColInfoData, pBlock->info.rows, in, pCtx[j].resultInfo->isNullRes);
}
}
releaseBufPage(pBuf, page);
nrows += 1;
numOfResult += numOfRowsToCopy;
if (numOfResult == rowCapacity) { // output buffer is full
pBlock->info.rows += pRow->numOfRows;
if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
break;
}
}
// qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv));
pBlock->info.rows = numOfResult;
blockDataUpdateTsWindow(pBlock);
return 0;
}
void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
int32_t* rowCellOffset) {
void doBuildResultDatablock(SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
int32_t* rowCellOffset, SqlFunctionCtx* pCtx) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
blockDataCleanup(pBlock);
......@@ -3181,7 +3171,7 @@ void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResI
}
int32_t orderType = TSDB_ORDER_ASC;
doCopyToSDataBlock(pBlock, rowCapacity, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset);
doCopyToSDataBlock(pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx);
// add condition (pBlock->info.rows >= 1) just to runtime happy
blockDataUpdateTsWindow(pBlock);
......@@ -4405,7 +4395,7 @@ static void doFinalizeResultImpl(SqlFunctionCtx* pCtx, int32_t numOfExpr) {
// SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
// doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
// } else {
pCtx[j].fpSet.finalize(&pCtx[j]);
// pCtx[j].fpSet.finalize(&pCtx[j]);
}
}
......@@ -4805,7 +4795,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
}
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset);
doBuildResultDatablock(pInfo->pRes, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset, pInfo->pCtx);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
......@@ -5129,8 +5119,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
}
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
doBuildResultDatablock(pBlock, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
doBuildResultDatablock(pBlock, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
......@@ -5149,7 +5138,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
}
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
doBuildResultDatablock(pInfo->binfo.pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
......@@ -5184,7 +5173,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
doBuildResultDatablock(pInfo->binfo.pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
ASSERT(pInfo->binfo.pRes->info.rows > 0);
pOperator->status = OP_RES_TO_RETURN;
......@@ -5229,7 +5218,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pSliceInfo->binfo.resultRowInfo);
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
// finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
......@@ -5286,8 +5275,8 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
OPTR_SET_OPENED(pOperator);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
doBuildResultDatablock(pInfo->binfo.pRes, &pInfo->groupResInfo, pOperator->pExpr,
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
......@@ -5377,7 +5366,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
return NULL;
......@@ -5409,7 +5398,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
......@@ -5426,7 +5415,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
return NULL;
......@@ -5458,7 +5447,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
doBuildResultDatablock(pBInfo->pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset, pInfo->binfo.pCtx);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator);
}
......
......@@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) {
doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
......@@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
while(1) {
doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
......
......@@ -24,7 +24,7 @@ extern "C" {
#include "functionMgt.h"
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void functionFinalize(SqlFunctionCtx *pCtx);
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow);
bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
......@@ -43,12 +43,12 @@ int32_t maxFunction(SqlFunctionCtx *pCtx);
bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t stddevFunction(SqlFunctionCtx* pCtx);
void stddevFinalize(SqlFunctionCtx* pCtx);
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t percentileFunction(SqlFunctionCtx *pCtx);
void percentileFinalize(SqlFunctionCtx* pCtx);
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
......@@ -60,6 +60,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId);
#ifdef __cplusplus
}
......
......@@ -507,7 +507,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.getEnvFunc = getTopBotFuncEnv,
.initFunc = functionSetup,
.processFunc = topFunction,
.finalizeFunc = functionFinalize
.finalizeFunc = topBotFinalize,
},
{
.name = "bottom",
......
......@@ -49,11 +49,17 @@ bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) {
return true;
}
void functionFinalize(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
cleanupResultRowEntry(pResInfo);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
pResInfo->isNullRes = (pResInfo->numOfRes == 0)? 1:0;
cleanupResultRowEntry(pResInfo);
char* in = GET_ROWCELL_INTERBUF(pResInfo);
colDataAppend(pCol, pBlock->info.rows, in, pResInfo->isNullRes);
return pResInfo->numOfRes;
}
EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) {
......@@ -612,12 +618,11 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
void stddevFinalize(SqlFunctionCtx* pCtx) {
functionFinalize(pCtx);
int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
double avg = pStddevRes->isum / ((double) pStddevRes->count);
pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg);
return functionFinalize(pCtx, pBlock, slotId);
}
typedef struct SPercentileInfo {
......@@ -739,7 +744,7 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS;
}
void percentileFinalize(SqlFunctionCtx* pCtx) {
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
SVariant* pVal = &pCtx->param[1].param;
double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
......@@ -752,7 +757,7 @@ void percentileFinalize(SqlFunctionCtx* pCtx) {
}
tMemBucketDestroy(pMemBucket);
functionFinalize(pCtx);
return functionFinalize(pCtx, pBlock, slotId);
}
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
......@@ -1173,16 +1178,14 @@ typedef struct STopBotResItem {
} STopBotResItem;
typedef struct STopBotRes {
int32_t num;
int32_t pageId;
// int32_t num;
STopBotResItem *pItems;
} STopBotRes;
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
int32_t bytes = pColNode->node.resType.bytes;
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * bytes;
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem);
return true;
}
......@@ -1194,13 +1197,14 @@ static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
return pRes;
}
static void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid);
static void doAddIntoResult(STopBotRes* pRes, int32_t maxSize, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock,
uint16_t type, uint64_t uid, SResultRowEntryInfo* pEntryInfo);
int32_t topFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = 0;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
assert(pRes->num >= 0);
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
// buildTopBotStruct(pRes, pCtx);
......@@ -1221,11 +1225,9 @@ int32_t topFunction(SqlFunctionCtx *pCtx) {
numOfElems++;
char* data = colDataGetData(pCol, i);
doAddIntoResult(pRes, pCtx->param[1].param.i, data, type, pInput->uid);
doAddIntoResult(pRes, pCtx->param[1].param.i, data, i, NULL, type, pInput->uid, pResInfo);
}
// treat the result as only one result
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
......@@ -1256,7 +1258,9 @@ static int32_t topBotResComparFn(const void *p1, const void *p2, const void *par
return (val1->v.d > val2->v.d) ? 1 : -1;
}
void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid) {
void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type,
uint64_t uid, SResultRowEntryInfo* pEntryInfo) {
SVariant val = {0};
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
......@@ -1264,29 +1268,71 @@ void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t ty
assert(pItems != NULL);
// not full yet
if (pRes->num < maxSize) {
STopBotResItem* pItem = &pItems[pRes->num];
if (pEntryInfo->numOfRes < maxSize) {
STopBotResItem* pItem = &pItems[pEntryInfo->numOfRes];
pItem->v = val;
pItem->uid = uid;
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
pRes->num++;
taosheapsort((void *) pItem, sizeof(STopBotResItem), pRes->num, (const void *) &type, topBotResComparFn, false);
if (pRes->pageId == -1) {
SFilePage* pPage = getNewBufPage(NULL, 0, &pRes->pageId);
pPage->num = sizeof(SFilePage);
// keep the current row data
for(int32_t i = 0; i < pSrcBlock->info.numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pSrcBlock->pDataBlock, i);
bool isNull = colDataIsNull_s(pCol, rowIndex);
colDataGetData(pCol, rowIndex);
}
}
// allocate the buffer and keep the data of this row into the new allocated buffer
pEntryInfo->numOfRes++;
taosheapsort((void *) pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void *) &type, topBotResComparFn, false);
} else { // replace the minimum value in the result
if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
(IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
STopBotResItem* pItem = &pItems[pRes->num];
STopBotResItem* pItem = &pItems[0];
pItem->v = val;
pItem->uid = uid;
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
taosheapadjust((void *) pItem, sizeof(STopBotResItem), 0, pRes->num - 1, (const void *) &type, topBotResComparFn, NULL, false);
taosheapadjust((void *) pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void *) &type, topBotResComparFn, NULL, false);
}
}
}
void topBotFinalize(SqlFunctionCtx* pCtx) {
functionFinalize(pCtx);
int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) {
SResultRowEntryInfo *pEntryInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo);
pEntryInfo->complete = true;
int32_t type = pCtx->input.pData[0]->info.type;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
// todo assign the tag value and the corresponding row data
int32_t currentRow = pBlock->info.rows;
switch(type) {
case TSDB_DATA_TYPE_INT: {
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STopBotResItem* pItem = &pRes->pItems[i];
colDataAppendInt32(pCol, currentRow++, (int32_t*)&pItem->v.i);
int32_t pageId = pItem->tuplePos.pageId;
int32_t offset = pItem->tuplePos.offset;
if (pageId != -1) {
// todo
}
}
break;
}
}
return pEntryInfo->numOfRes;
// return functionFinalize(pCtx, pBlock, slotId);
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册