提交 78b1bf27 编写于 作者: H Haojun Liao

[td-14393] refactor.

上级 ddc63b9f
......@@ -194,7 +194,8 @@ typedef struct SqlFunctionCtx {
int32_t numOfParams;
SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list
void *ptsOutputBuf; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset;
SVariant tag;
struct SResultRowEntryInfo *resultInfo;
SSubsidiaryResInfo subsidiaryRes;
......
......@@ -1264,17 +1264,21 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
taosArrayDestroy(pBlockList);
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
ASSERT(!fmIsAggFunc(pCtx->functionId));
ASSERT(!fmIsAggFunc(pCtx[k].functionId));
if (fmIsNonstandardSQLFunc(pCtx->functionId)) {
if (fmIsPseudoColumnFunc(pCtx[k].functionId)) {
// TODO: set the correct _rowts column output buffer, there may be multiple _rowts columns
} else if (fmIsNonstandardSQLFunc(pCtx[k].functionId)) {
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
pCtx[k].ptsList = 0;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[k]);
pCtx[k].fpSet.init(&pCtx[k], pResInfo);
pCtx[k].pOutput = pColInfoData->pData;
pCtx[k].pOutput = (char*)pColInfoData;
// pCtx[k].pTsOutput =
int32_t numOfRows = pCtx[k].fpSet.process(&pCtx[k]);
pResult->info.rows = numOfRows;
pResult->info.rows += numOfRows;
} else {
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pBlockList, &pSrcBlock);
......@@ -1284,7 +1288,6 @@ static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSData
scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
pResult->info.rows = dest.numOfRows;
taosArrayDestroy(pBlockList);
}
} else {
......@@ -1926,12 +1929,12 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
pCtx->ptsOutputBuf = NULL;
pCtx->pTsOutput = NULL;
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
pCtx->resDataInfo.type = pFunct->resSchema.type;
pCtx->order = TSDB_ORDER_ASC;
pCtx->resDataInfo.type = pFunct->resSchema.type;
pCtx->order = TSDB_ORDER_ASC;
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
#if 0
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
// int16_t type = pFunct->param[j].nType;
......@@ -2983,7 +2986,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
// set the timestamp output buffer for top/bottom/diff query
// int32_t fid = pCtx[i].functionId;
// if (fid == FUNCTION_TOP || fid == FUNCTION_BOTTOM || fid == FUNCTION_DIFF || fid == FUNCTION_DERIVATIVE) {
// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i-1].pOutput;
// if (i > 0) pCtx[i].pTsOutput = pCtx[i-1].pOutput;
// }
}
......@@ -3020,7 +3023,7 @@ void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOf
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF ||
functionId == FUNCTION_DERIVATIVE) {
if (i > 0) pBInfo->pCtx[i].ptsOutputBuf = pBInfo->pCtx[i - 1].pOutput;
// if (i > 0) pBInfo->pCtx[i].pTsOutput = pBInfo->pCtx[i - 1].pOutput;
}
}
}
......@@ -3058,7 +3061,7 @@ void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput)
void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size) {
for (int32_t j = 0; j < size; ++j) {
struct SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]);
if (isRowEntryInitialized(pResInfo) || pCtx[j].functionId == -1) {
if (isRowEntryInitialized(pResInfo) || fmIsPseudoColumnFunc(pCtx[j].functionId) || pCtx[j].functionId == -1 || fmIsScalarFunc(pCtx[j].functionId)) {
continue;
}
......@@ -3224,7 +3227,7 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv* pRuntimeEnv, SResultRow* pRes
}
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
// if (i > 0) pCtx[i].pTsOutput = pCtx[i - 1].pOutput;
}
// if (!pResInfo->initialized) {
......
......@@ -387,7 +387,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_ROWTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
.checkFunc = stubCheckAndGetResultType,
.getEnvFunc = NULL,
.getEnvFunc = getTimePseudoFuncEnv,
.initFunc = NULL,
.sprocessFunc = NULL,
.finalizeFunc = NULL
......@@ -504,6 +504,7 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
break;
}
case FUNCTION_TYPE_ROWTS:
case FUNCTION_TYPE_QSTARTTS:
case FUNCTION_TYPE_QENDTS:
case FUNCTION_TYPE_WSTARTTS:
......@@ -589,7 +590,6 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
break;
}
case FUNCTION_TYPE_ROWTS:
case FUNCTION_TYPE_TBNAME: {
// todo
break;
......
......@@ -847,8 +847,10 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) {
}
typedef struct SDiffInfo {
bool valueAssigned;
bool hasPrev;
bool includeNull;
bool ignoreNegative;
bool firstOutput;
union { int64_t i64; double d64;} prev;
} SDiffInfo;
......@@ -863,9 +865,11 @@ bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo) {
}
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDiffInfo->valueAssigned = false;
pDiffInfo->prev.i64 = 0;
pDiffInfo->hasPrev = false;
pDiffInfo->prev.i64 = 0;
pDiffInfo->ignoreNegative = false; // TODO set correct param
pDiffInfo->includeNull = false;
pDiffInfo->firstOutput = false;
return true;
}
......@@ -876,65 +880,79 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
bool isFirstBlock = (pDiffInfo->valueAssigned == false);
bool isFirstBlock = (pDiffInfo->hasPrev == false);
int32_t numOfElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
// int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
TSKEY* pTimestamp = pCtx->ptsOutputBuf;
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
TSKEY* tsList = GET_TS_LIST(pCtx);
int32_t startOffset = 0;
switch (pInputCol->info.type) {
case TSDB_DATA_TYPE_INT: {
int32_t *pOutput = (int32_t *)pCtx->pOutput;
SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
int32_t pos = startOffset + (isFirstBlock? (numOfElems-1):numOfElems);
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
if (pDiffInfo->includeNull) {
colDataSetNull_f(pOutput->nullbitmap, pos);
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
numOfElems += 1;
}
continue;
}
int32_t v = *(int32_t*) colDataGetData(pInputCol, i);
if (pDiffInfo->valueAssigned) {
int64_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (pDiffInfo->ignoreNegative) {
continue;
if (pDiffInfo->hasPrev) {
int32_t delta = (int32_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
colDataSetNull_f(pOutput->nullbitmap, pos);
} else {
colDataAppendInt32(pOutput, pos, &delta);
}
}
*(pOutput++) = delta;
// *pTimestamp = (tsList != NULL)? tsList[i]:0;
pTimestamp += 1;
if (tsList != NULL) {
colDataAppendInt64(pTsOutput, pos, &tsList[i]);
}
pDiffInfo->prev.i64 = v;
pDiffInfo->valueAssigned = true;
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
}
case TSDB_DATA_TYPE_BIGINT: {
int64_t *pOutput = (int64_t *)pCtx->pOutput;
SColumnInfoData *pOutput = (SColumnInfoData *)pCtx->pOutput;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue;
}
int32_t v = 0;
if (pDiffInfo->valueAssigned) {
if (pDiffInfo->hasPrev) {
v = *(int64_t*) colDataGetData(pInputCol, i);
int64_t delta = (int64_t)(v - pDiffInfo->prev.i64); // direct previous may be null
if (pDiffInfo->ignoreNegative) {
continue;
}
*(pOutput++) = delta;
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
pTimestamp += 1;
// *(pOutput++) = delta;
// *pTimestamp = (tsList != NULL)? tsList[i]:0;
//
// pOutput += 1;
// pTimestamp += 1;
}
pDiffInfo->prev.i64 = v;
pDiffInfo->valueAssigned = true;
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
......@@ -952,7 +970,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
continue;
}
if (pDiffInfo->valueAssigned) { // initial value is not set yet
if (pDiffInfo->hasPrev) { // initial value is not set yet
SET_DOUBLE_VAL(pOutput, pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
......@@ -960,7 +978,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
}
pDiffInfo->d64Prev = pData[i];
pDiffInfo->valueAssigned = true;
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
......@@ -977,7 +995,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
continue;
}
if (pDiffInfo->valueAssigned) { // initial value is not set yet
if (pDiffInfo->hasPrev) { // initial value is not set yet
*pOutput = (float)(pData[i] - pDiffInfo->d64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
......@@ -985,7 +1003,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
}
pDiffInfo->d64Prev = pData[i];
pDiffInfo->valueAssigned = true;
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
......@@ -1002,7 +1020,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
continue;
}
if (pDiffInfo->valueAssigned) { // initial value is not set yet
if (pDiffInfo->hasPrev) { // initial value is not set yet
*pOutput = (int16_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
......@@ -1010,7 +1028,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
}
pDiffInfo->i64Prev = pData[i];
pDiffInfo->valueAssigned = true;
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
......@@ -1028,7 +1046,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
continue;
}
if (pDiffInfo->valueAssigned) { // initial value is not set yet
if (pDiffInfo->hasPrev) { // initial value is not set yet
*pOutput = (int8_t)(pData[i] - pDiffInfo->i64Prev); // direct previous may be null
*pTimestamp = (tsList != NULL)? tsList[i]:0;
pOutput += 1;
......@@ -1036,7 +1054,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
}
pDiffInfo->i64Prev = pData[i];
pDiffInfo->valueAssigned = true;
pDiffInfo->hasPrev = true;
numOfElems++;
}
break;
......@@ -1048,7 +1066,7 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
}
// initial value is not set yet
if (!pDiffInfo->valueAssigned || numOfElems <= 0) {
if (!pDiffInfo->hasPrev || numOfElems <= 0) {
/*
* 1. current block and blocks before are full of null
* 2. current block may be null value
......@@ -1064,7 +1082,6 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
int32_t forwardStep = (isFirstBlock) ? numOfElems - 1 : numOfElems;
return forwardStep;
// pResInfo->numOfRes += forwardStep;
}
}
......@@ -1902,10 +1902,10 @@ static void copyTopBotRes(SqlFunctionCtx *pCtx, int32_t type) {
}
// set the output timestamp of each record.
TSKEY *output = pCtx->ptsOutputBuf;
for (int32_t i = 0; i < len; ++i, output += step) {
*output = tvp[i]->timestamp;
}
// TSKEY *output = pCtx->pTsOutput;
// for (int32_t i = 0; i < len; ++i, output += step) {
// *output = tvp[i]->timestamp;
// }
// set the corresponding tag data for each record
// todo check malloc failure
......@@ -2687,7 +2687,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
TSKEY *pTimestamp = pCtx->ptsOutputBuf;
TSKEY *pTimestamp = NULL;//pCtx->pTsOutput;
TSKEY *tsList = GET_TS_LIST(pCtx);
double *pOutput = (double *)pCtx->pOutput;
......@@ -2867,7 +2867,7 @@ static void deriv_function(SqlFunctionCtx *pCtx) {
} else { \
*(type *)(ctx)->pOutput = *(type *)(d) - (*(type *)(&(ctx)->param[1].i)); \
*(type *)(&(ctx)->param[1].i) = *(type *)(d); \
*(int64_t *)(ctx)->ptsOutputBuf = GET_TS_DATA(ctx, index); \
*(int64_t *)(ctx)->pTsOutput = GET_TS_DATA(ctx, index); \
} \
} while (0);
......@@ -2881,7 +2881,7 @@ static void diff_function(SqlFunctionCtx *pCtx) {
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
TSKEY* pTimestamp = pCtx->ptsOutputBuf;
TSKEY* pTimestamp = NULL;//pCtx->pTsOutput;
TSKEY* tsList = GET_TS_LIST(pCtx);
switch (pCtx->inputType) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册