提交 70927458 编写于 作者: H Haojun Liao

refactor(query): do some internal refactor.

上级 1652cd0e
...@@ -61,9 +61,6 @@ typedef struct SFileBlockInfo { ...@@ -61,9 +61,6 @@ typedef struct SFileBlockInfo {
#define TSDB_BLOCK_DIST_STEP_ROWS 8 #define TSDB_BLOCK_DIST_STEP_ROWS 8
#define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results #define MAX_INTERVAL_TIME_WINDOW 1000000 // maximum allowed time windows in final results
#define FUNCTION_TYPE_SCALAR 1
#define FUNCTION_TYPE_AGG 2
#define TOP_BOTTOM_QUERY_LIMIT 100 #define TOP_BOTTOM_QUERY_LIMIT 100
#define FUNCTIONS_NAME_MAX_LENGTH 16 #define FUNCTIONS_NAME_MAX_LENGTH 16
...@@ -165,9 +162,6 @@ enum { ...@@ -165,9 +162,6 @@ enum {
typedef struct tExprNode { typedef struct tExprNode {
int32_t nodeType; int32_t nodeType;
union { union {
SSchema *pSchema;// column node
struct SVariant *pVal; // value node
struct {// function node struct {// function node
char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor
int32_t functionId; int32_t functionId;
...@@ -210,47 +204,23 @@ struct SScalarParam { ...@@ -210,47 +204,23 @@ struct SScalarParam {
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength, int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, SResultDataInfo* pInfo, int16_t extLength,
bool isSuperTable); bool isSuperTable);
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num); void resetResultRowEntryResult(SqlFunctionCtx* pCtx, int32_t num);
void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell); void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell);
int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock); int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock);
bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry); bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry);
bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry); bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry);
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// fill api
struct SFillInfo;
struct SFillColInfo;
typedef struct SPoint { typedef struct SPoint {
int64_t key; int64_t key;
void * val; void * val;
} SPoint; } SPoint;
//void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
//void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
//void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
//struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const SValueNode* val);
//bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
//
//struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
// SInterval* pInterval, int32_t fillType,
// struct SFillColInfo* pCol, const char* id);
//
//void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
//int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
//int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType); int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// udf api // udf api
struct SUdfInfo; struct SUdfInfo;
void qAddUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);
void qRemoveUdfInfo(uint64_t id, struct SUdfInfo* pUdfInfo);
/** /**
* create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf * create udfd proxy, called once in process that call doSetupUdf/callUdfxxx/doTeardownUdf
* @return error code * @return error code
......
...@@ -355,13 +355,18 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) ...@@ -355,13 +355,18 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex)
} }
int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex; int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex;
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index);
if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) { if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
return 0; return 0;
} }
pDataBlock->info.window.skey = *(TSKEY*)colDataGetData(pColInfoData, 0); TSKEY skey = *(TSKEY*)colDataGetData(pColInfoData, 0);
pDataBlock->info.window.ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); TSKEY ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1));
pDataBlock->info.window.skey = TMIN(skey, ekey);
pDataBlock->info.window.ekey = TMAX(skey, ekey);
return 0; return 0;
} }
......
...@@ -84,7 +84,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); ...@@ -84,7 +84,6 @@ size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size); int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size);
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo);
void closeAllResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo);
void initResultRow(SResultRow *pResultRow); void initResultRow(SResultRow *pResultRow);
...@@ -93,15 +92,6 @@ bool isResultRowClosed(SResultRow* pResultRow); ...@@ -93,15 +92,6 @@ bool isResultRowClosed(SResultRow* pResultRow);
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset); struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset);
static FORCE_INLINE SResultRow *getResultRow(SDiskbasedBuf* pBuf, SResultRowInfo *pResultRowInfo, int32_t slot) {
ASSERT(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
SResultRowPosition* pos = &pResultRowInfo->pPosition[slot];
SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
return pRow;
}
static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) { static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRowPosition* pos) {
SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId); SFilePage* bufPage = (SFilePage*) getBufPage(pBuf, pos->pageId);
SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset); SResultRow* pRow = (SResultRow*)((char*)bufPage + pos->offset);
......
...@@ -101,20 +101,8 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow ...@@ -101,20 +101,8 @@ void resetResultRowInfo(STaskRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRow
pResultRowInfo->size = 0; pResultRowInfo->size = 0;
} }
int32_t numOfClosedResultRows(SResultRowInfo *pResultRowInfo) {
int32_t i = 0;
// while (i < pResultRowInfo->size && pResultRowInfo->pResult[i]->closed) {
// ++i;
// }
return i;
}
void closeAllResultRows(SResultRowInfo *pResultRowInfo) { void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); // do nothing
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
}
} }
bool isResultRowClosed(SResultRow* pRow) { bool isResultRowClosed(SResultRow* pRow) {
......
...@@ -345,6 +345,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -345,6 +345,8 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
// pResultRowInfo object. // pResultRowInfo object.
if (p1 != NULL) { if (p1 != NULL) {
// todo
pResult = getResultRowByPos(pResultBuf, p1); pResult = getResultRowByPos(pResultBuf, p1);
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
} }
...@@ -353,11 +355,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR ...@@ -353,11 +355,9 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
// 1. close current opened time window // 1. close current opened time window
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId && if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId &&
pResult->offset != pResultRowInfo->cur.offset))) { pResult->offset != pResultRowInfo->cur.offset))) {
// todo extract function
SResultRowPosition pos = pResultRowInfo->cur; SResultRowPosition pos = pResultRowInfo->cur;
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId); SFilePage*
SResultRow* pRow = (SResultRow*)((char*)pPage + pos.offset); pPage = getBufPage(pResultBuf, pos.pageId);
closeResultRow(pRow);
releaseBufPage(pResultBuf, pPage); releaseBufPage(pResultBuf, pPage);
} }
...@@ -552,11 +552,13 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow ...@@ -552,11 +552,13 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey); colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
} }
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
int32_t numOfTotal, int32_t numOfOutput, int32_t order) { int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
// keep it temporarily // keep it temporarily
// todo no need this??
bool hasAgg = pCtx[k].input.colDataAggIsSet; bool hasAgg = pCtx[k].input.colDataAggIsSet;
int32_t numOfRows = pCtx[k].input.numOfRows; int32_t numOfRows = pCtx[k].input.numOfRows;
int32_t startOffset = pCtx[k].input.startRowIndex; int32_t startOffset = pCtx[k].input.startRowIndex;
...@@ -576,6 +578,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow ...@@ -576,6 +578,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) { if (fmIsWindowPseudoColumnFunc(pCtx[k].functionId)) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(&pCtx[k]);
char* p = GET_ROWCELL_INTERBUF(pEntryInfo); char* p = GET_ROWCELL_INTERBUF(pEntryInfo);
SColumnInfoData idata = {0}; SColumnInfoData idata = {0};
...@@ -587,11 +590,11 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow ...@@ -587,11 +590,11 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData}; SScalarParam tw = {.numOfRows = 5, .columnData = pTimeWindowData};
pCtx[k].sfp.process(&tw, 1, &out); pCtx[k].sfp.process(&tw, 1, &out);
pEntryInfo->numOfRes = 1; pEntryInfo->numOfRes = 1;
continue; } else {
}
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) { if (functionNeedToExecute(&pCtx[k]) && pCtx[k].fpSet.process != NULL) {
code = pCtx[k].fpSet.process(&pCtx[k]); code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code)); qError("%s apply functions error, code: %s", GET_TASKID(taskInfo), tstrerror(code));
taskInfo->code = code; taskInfo->code = code;
...@@ -604,6 +607,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow ...@@ -604,6 +607,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
pCtx[k].input.startRowIndex = startOffset; pCtx[k].input.startRowIndex = startOffset;
pCtx[k].input.numOfRows = numOfRows; pCtx[k].input.numOfRows = numOfRows;
} }
}
} }
static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* pWindow) { static FORCE_INLINE TSKEY reviseWindowEkey(STaskAttr* pQueryAttr, STimeWindow* pWindow) {
...@@ -741,7 +745,10 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct ...@@ -741,7 +745,10 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct
for (int32_t k = 0; k < pOperator->numOfExprs; ++k) { for (int32_t k = 0; k < pOperator->numOfExprs; ++k) {
if (functionNeedToExecute(&pCtx[k])) { if (functionNeedToExecute(&pCtx[k])) {
// todo add a dummy funtion to avoid process check // todo add a dummy funtion to avoid process check
if (pCtx[k].fpSet.process != NULL) { if (pCtx[k].fpSet.process == NULL) {
continue;
}
int32_t code = pCtx[k].fpSet.process(&pCtx[k]); int32_t code = pCtx[k].fpSet.process(&pCtx[k]);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code)); qError("%s aggregate function error happens, code: %s", GET_TASKID(pOperator->pTaskInfo), tstrerror(code));
...@@ -749,7 +756,6 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct ...@@ -749,7 +756,6 @@ static int32_t doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunct
} }
} }
} }
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -534,8 +534,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, ...@@ -534,8 +534,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset); pInfo->pPseudoCtx = createSqlFunctionCtx(pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, &pInfo->rowCellInfoOffset);
} }
// pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]}; pInfo->scanInfo = (SScanInfo){.numOfAsc = pTableScanNode->scanSeq[0], .numOfDesc = pTableScanNode->scanSeq[1]};
pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose // pInfo->scanInfo = (SScanInfo){.numOfAsc = 0, .numOfDesc = 1}; // for debug purpose
pInfo->readHandle = *readHandle; pInfo->readHandle = *readHandle;
pInfo->interval = extractIntervalInfo(pTableScanNode); pInfo->interval = extractIntervalInfo(pTableScanNode);
......
...@@ -12,6 +12,11 @@ typedef enum SResultTsInterpType { ...@@ -12,6 +12,11 @@ typedef enum SResultTsInterpType {
static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator); static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator);
static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator); static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator);
static int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo);
static SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult);
static void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult);
/* /*
* There are two cases to handle: * There are two cases to handle:
* *
...@@ -135,8 +140,10 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo ...@@ -135,8 +140,10 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
// set time window for current result // set time window for current result
pResultRow->win = (*win); pResultRow->win = (*win);
*pResult = pResultRow; *pResult = pResultRow;
setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -164,38 +171,38 @@ static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsL ...@@ -164,38 +171,38 @@ static void doKeepNewWindowStartInfo(SWindowRowsSup* pRowSup, const int64_t* tsL
static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey, static FORCE_INLINE int32_t getForwardStepsInBlock(int32_t numOfRows, __block_search_fn_t searchFn, TSKEY ekey,
int16_t pos, int16_t order, int64_t* pData) { int16_t pos, int16_t order, int64_t* pData) {
int32_t forwardStep = 0; int32_t forwardRows = 0;
if (order == TSDB_ORDER_ASC) { if (order == TSDB_ORDER_ASC) {
int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order); int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
if (end >= 0) { if (end >= 0) {
forwardStep = end; forwardRows = end;
if (pData[end + pos] == ekey) { if (pData[end + pos] == ekey) {
forwardStep += 1; forwardRows += 1;
} }
} }
} else { } else {
int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order); int32_t end = searchFn((char*)&pData[pos], numOfRows - pos, ekey, order);
if (end >= 0) { if (end >= 0) {
forwardStep = end; forwardRows = end;
if (pData[end + pos] == ekey) { if (pData[end + pos] == ekey) {
forwardStep += 1; forwardRows += 1;
} }
} }
// int32_t end = searchFn((char*)pData, pos + 1, ekey, order); // int32_t end = searchFn((char*)pData, pos + 1, ekey, order);
// if (end >= 0) { // if (end >= 0) {
// forwardStep = pos - end; // forwardRows = pos - end;
// //
// if (pData[end] == ekey) { // if (pData[end] == ekey) {
// forwardStep += 1; // forwardRows += 1;
// } // }
// } // }
} }
assert(forwardStep >= 0); assert(forwardRows >= 0);
return forwardStep; return forwardRows;
} }
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) { int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
...@@ -431,10 +438,7 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in ...@@ -431,10 +438,7 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in
static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo *pInfo, SqlFunctionCtx* pCtx, int32_t numOfExprs, int32_t pos, static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo *pInfo, SqlFunctionCtx* pCtx, int32_t numOfExprs, int32_t pos,
SSDataBlock* pBlock, const TSKEY* tsCols, STimeWindow* win) { SSDataBlock* pBlock, const TSKEY* tsCols, STimeWindow* win) {
int32_t numOfRows = pBlock->info.rows;
bool ascQuery = (pInfo->order == TSDB_ORDER_ASC); bool ascQuery = (pInfo->order == TSDB_ORDER_ASC);
int32_t step = 1; // GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
TSKEY curTs = tsCols[pos]; TSKEY curTs = tsCols[pos];
...@@ -449,14 +453,15 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo *pInfo, S ...@@ -449,14 +453,15 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo *pInfo, S
return true; return true;
} }
if (lastTs == INT64_MIN && ((pos == 0 && ascQuery) || (pos == ( - 1) && !ascQuery))) { // it is the first time window, no need to do interpolation
if (pTsKey->isNull && pos == 0) {
setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_START_INTERP);
return true; } else {
TSKEY prevTs = ((pos == 0) ? lastTs : tsCols[pos - 1]);
doTimeWindowInterpolation(pInfo, numOfExprs, pBlock->pDataBlock, prevTs, pos - 1, curTs, pos, key,
RESULT_ROW_START_INTERP);
} }
TSKEY prevTs = ((pos == 0 && ascQuery) || (pos == (numOfRows - 1) && !ascQuery)) ? lastTs : tsCols[pos - step];
doTimeWindowInterpolation(pInfo, numOfExprs, pBlock->pDataBlock, prevTs, pos - step, curTs, pos, key, RESULT_ROW_START_INTERP);
return true; return true;
} }
...@@ -473,14 +478,13 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo *pInfo, Sql ...@@ -473,14 +478,13 @@ static bool setTimeWindowInterpolationEndTs(SIntervalAggOperatorInfo *pInfo, Sql
return false; return false;
} }
// there is actual end point of current time window, no interpolation need // there is actual end point of current time window, no interpolation needs
if (key == actualEndKey) { if (key == actualEndKey) {
setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_END_INTERP); setNotInterpoWindowKey(pCtx, numOfExprs, RESULT_ROW_END_INTERP);
return true; return true;
} }
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order); int32_t nextRowIndex = endRowIndex + 1;
int32_t nextRowIndex = endRowIndex + step;
assert(nextRowIndex >= 0); assert(nextRowIndex >= 0);
TSKEY nextKey = tsCols[nextRowIndex]; TSKEY nextKey = tsCols[nextRowIndex];
...@@ -575,27 +579,23 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) { ...@@ -575,27 +579,23 @@ static void setResultRowInterpo(SResultRow* pResult, SResultTsInterpType type) {
} }
static void doWindowBorderInterpolation(SIntervalAggOperatorInfo *pInfo, SSDataBlock* pBlock, int32_t numOfExprs, SqlFunctionCtx* pCtx, static void doWindowBorderInterpolation(SIntervalAggOperatorInfo *pInfo, SSDataBlock* pBlock, int32_t numOfExprs, SqlFunctionCtx* pCtx,
SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardStep) { SResultRow* pResult, STimeWindow* win, int32_t startPos, int32_t forwardRows) {
if (!pInfo->timeWindowInterpo) { if (!pInfo->timeWindowInterpo) {
return; return;
} }
assert(pBlock != NULL); ASSERT(pBlock != NULL);
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pInfo->order);
if (pBlock->pDataBlock == NULL) { if (pBlock->pDataBlock == NULL) {
// tscError("pBlock->pDataBlock == NULL"); // tscError("pBlock->pDataBlock == NULL");
return; return;
} }
// todo set the correct primary timestamp column index SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, 0);
TSKEY* tsCols = (TSKEY*)(pColInfo->pData); TSKEY* tsCols = (TSKEY*)(pColInfo->pData);
bool done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP); bool done = isResultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!done) { // it is not interpolated, now start to generated the interpolated value if (!done) { // it is not interpolated, now start to generated the interpolated value
int32_t startRowIndex = startPos; bool interp = setTimeWindowInterpolationStartTs(pInfo, pCtx, numOfExprs, startPos, pBlock, tsCols, win);
bool interp = setTimeWindowInterpolationStartTs(pInfo, pCtx, numOfExprs, startRowIndex, pBlock, tsCols, win);
if (interp) { if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP); setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
} }
...@@ -611,7 +611,7 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo *pInfo, SSDataB ...@@ -611,7 +611,7 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo *pInfo, SSDataB
// interpolation query does not generate the time window end interpolation // interpolation query does not generate the time window end interpolation
done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP); done = isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) { if (!done) {
int32_t endRowIndex = startPos + (forwardStep - 1) * step; int32_t endRowIndex = startPos + forwardRows - 1;
TSKEY endKey = (pInfo->order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey; TSKEY endKey = (pInfo->order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
bool interp = bool interp =
...@@ -654,88 +654,29 @@ static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, S ...@@ -654,88 +654,29 @@ static void saveDataBlockLastRow(SArray* pPrevKeys, const SSDataBlock* pBlock, S
} }
} }
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock, static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t numOfExprs, SResultRowInfo* pResultRowInfo,
uint64_t tableGroupId) { SSDataBlock* pBlock, int32_t scanFlag, int64_t* tsCols, SResultRowPosition* p) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t numOfOutput = pOperatorInfo->numOfExprs;
SArray* pUpdated = NULL;
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pUpdated = taosArrayInit(4, POINTER_BYTES);
}
int32_t step = 1;
bool ascScan = (pInfo->order == TSDB_ORDER_ASC);
// int32_t prevIndex = pResultRowInfo->curPos;
TSKEY* tsCols = NULL;
if (pBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
if (tsCols != NULL) { SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
}
}
int32_t startPos = 0; int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols, pBlock->info.rows, ascScan); int32_t numOfOutput = pOperatorInfo->numOfExprs;
uint64_t groupId = pBlock->info.groupId;
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, &pInfo->win);
bool masterScan = true;
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
taosArrayPush(pUpdated, &pos);
}
int32_t forwardStep = 0;
TSKEY ekey = ascScan? win.ekey:win.skey;
forwardStep =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
ASSERT(forwardStep > 0);
// prev time window not interpolation yet.
SResultRowPosition pos = {0};
if (pInfo->timeWindowInterpo) {
pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
SResultRowPosition* px = (SResultRowPosition*)pn->data;
// do nothing
if (pn != NULL && px->pageId == pos.pageId && px->offset == pos.offset) {
} else {
tdListAppend(pResultRowInfo->openWindow, &pos);
}
}
if (pInfo->timeWindowInterpo) {
while (1) { while (1) {
SListNode* pn = tdListGetHead(pResultRowInfo->openWindow); SListNode* pn = tdListGetHead(pResultRowInfo->openWindow);
SResultRowPosition* p1 = (SResultRowPosition*)pn->data; SResultRowPosition* p1 = (SResultRowPosition*)pn->data;
if (p1->pageId == pos.pageId && p1->offset == pos.offset) { if (p->pageId == p1->pageId && p->offset == p1->offset) {
break; break;
} }
SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1); SResultRow* pr = getResultRowByPos(pInfo->aggSup.pResultBuf, p1);
ASSERT(pr->offset == p1->offset && pr->pageId == p1->pageId);
if (pr->closed) { if (pr->closed) {
ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) && isResultRowInterpolated(pr, RESULT_ROW_END_INTERP)); ASSERT(isResultRowInterpolated(pr, RESULT_ROW_START_INTERP) && isResultRowInterpolated(pr, RESULT_ROW_END_INTERP));
tdListPopHead(pResultRowInfo->openWindow); tdListPopHead(pResultRowInfo->openWindow);
...@@ -743,7 +684,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -743,7 +684,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
} }
STimeWindow w = pr->win; STimeWindow w = pr->win;
ret = setTimeWindowOutputBuf(pResultRowInfo, &w, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &w, (scanFlag == MAIN_SCAN), &pResult, groupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -757,32 +698,86 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -757,32 +698,86 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
w.ekey, RESULT_ROW_END_INTERP); w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->binfo.pCtx, pOperatorInfo->numOfExprs, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pInfo->binfo.pCtx, numOfExprs, RESULT_ROW_START_INTERP);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols,
pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pBlock->info.rows, numOfExprs, pInfo->order);
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pr);
tdListPopHead(pResultRowInfo->openWindow); tdListPopHead(pResultRowInfo->openWindow);
} else { // the remains are can not be closed yet.
break;
}
}
}
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pBlock,
int32_t scanFlag) {
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)pOperatorInfo->info;
SArray* pUpdated = NULL;
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
pUpdated = taosArrayInit(4, POINTER_BYTES);
}
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
int32_t startPos = 0;
int32_t numOfOutput = pOperatorInfo->numOfExprs;
int64_t *tsCols = extractTsCol(pBlock, pInfo);
uint64_t tableGroupId = pBlock->info.groupId;
bool ascScan = (pInfo->order == TSDB_ORDER_ASC);
TSKEY ts = getStartTsKey(&pBlock->info.window, tsCols, pBlock->info.rows, ascScan);
SResultRow* pResult = NULL;
STimeWindow win = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval,
pInfo->interval.precision, &pInfo->win);
int32_t ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
// todo keep current outputbuffer info to avoid repeatly set the buffer info if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
pos->groupId = tableGroupId;
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey;
taosArrayPush(pUpdated, &pos);
}
TSKEY ekey = ascScan? win.ekey:win.skey;
int32_t forwardRows = getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
ASSERT(forwardRows > 0);
// prev time window not interpolation yet.
if (pInfo->timeWindowInterpo) {
SResultRowPosition pos = addToOpenWindowList(pResultRowInfo, pResult);
doInterpUnclosedTimeWindow(pOperatorInfo, numOfOutput, pResultRowInfo, pBlock, scanFlag, tsCols, &pos);
// restore current time window // restore current time window
ret = setTimeWindowOutputBuf(pResultRowInfo, &win, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, ret = setTimeWindowOutputBuf(pResultRowInfo, &win, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
// window start key interpolation // window start key interpolation
doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &win, startPos, forwardStep); doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &win, startPos, forwardRows);
} }
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pBlock->info.rows, numOfOutput, pInfo->order);
doCloseWindow(pResultRowInfo, pInfo, pResult);
STimeWindow nextWin = win; STimeWindow nextWin = win;
while (1) { while (1) {
int32_t prevEndPos = (forwardStep - 1) * step + startPos; int32_t prevEndPos = forwardRows - 1 + startPos;
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pBlock->info, tsCols, prevEndPos, pInfo->order);
if (startPos < 0) { if (startPos < 0) {
break; break;
...@@ -790,7 +785,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -790,7 +785,7 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
// null data, failed to allocate more memory buffer // null data, failed to allocate more memory buffer
int32_t code = int32_t code =
setTimeWindowOutputBuf(pResultRowInfo, &nextWin, masterScan, &pResult, tableGroupId, pInfo->binfo.pCtx, setTimeWindowOutputBuf(pResultRowInfo, &nextWin, (scanFlag == MAIN_SCAN), &pResult, tableGroupId, pInfo->binfo.pCtx,
numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo); numOfOutput, pInfo->binfo.rowCellInfoOffset, &pInfo->aggSup, pTaskInfo);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
...@@ -806,24 +801,61 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe ...@@ -806,24 +801,61 @@ static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
} }
ekey = ascScan? nextWin.ekey:nextWin.skey; ekey = ascScan? nextWin.ekey:nextWin.skey;
forwardStep = forwardRows =
getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order); getNumOfRowsInTimeWindow(&pBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, pInfo->order);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep); doWindowBorderInterpolation(pInfo, pBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pBlock->info.rows, numOfOutput, pInfo->order);
doCloseWindow(pResultRowInfo, pInfo, pResult);
} }
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
int32_t rowIndex = ascScan? (pBlock->info.rows - 1) : 0;
saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols); saveDataBlockLastRow(pInfo->pPrevValues, pBlock, pInfo->pInterpCols);
} }
return pUpdated; return pUpdated;
// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false); }
void doCloseWindow(SResultRowInfo* pResultRowInfo, const SIntervalAggOperatorInfo* pInfo, SResultRow* pResult) {
// current result is done in computing final results.
if (pInfo->timeWindowInterpo && isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
closeResultRow(pResult);
tdListPopHead(pResultRowInfo->openWindow);
}
}
SResultRowPosition addToOpenWindowList(SResultRowInfo* pResultRowInfo, const SResultRow* pResult) {
SResultRowPosition pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
SListNode* pn = tdListGetTail(pResultRowInfo->openWindow);
if (pn == NULL) {
tdListAppend(pResultRowInfo->openWindow, &pos);
return pos;
}
SResultRowPosition* px = (SResultRowPosition*)pn->data;
if (px->pageId != pos.pageId || px->offset != pos.offset) {
tdListAppend(pResultRowInfo->openWindow, &pos);
}
return pos;
}
int64_t* extractTsCol(SSDataBlock* pBlock, const SIntervalAggOperatorInfo* pInfo) {
TSKEY* tsCols = NULL;
if (pBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData;
if (tsCols != NULL) {
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
}
}
return tsCols;
} }
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
...@@ -852,7 +884,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { ...@@ -852,7 +884,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; STableQueryInfo* pTableQueryInfo = pInfo->pCurrent;
setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window);
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag);
#if 0 // test for encode/decode result info #if 0 // test for encode/decode result info
if(pOperator->encodeResultRow){ if(pOperator->encodeResultRow){
...@@ -983,6 +1015,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { ...@@ -983,6 +1015,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
} }
SStateWindowOperatorInfo* pInfo = pOperator->info; SStateWindowOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
...@@ -1165,7 +1198,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1165,7 +1198,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
break; break;
} }
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the // The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now. // caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true); setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, MAIN_SCAN, true);
...@@ -1180,7 +1213,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { ...@@ -1180,7 +1213,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
continue; continue;
} }
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, pBlock->info.groupId); pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, MAIN_SCAN);
} }
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset); finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
...@@ -1230,26 +1263,26 @@ static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) { ...@@ -1230,26 +1263,26 @@ static bool allInvertible(SqlFunctionCtx* pFCtx, int32_t numOfCols) {
return true; return true;
} }
static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SArray** pInterpList, SArray** pPrevCols) { static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SIntervalAggOperatorInfo* pInfo) {
// the primary timestamp column // the primary timestamp column
bool needed = false; bool needed = false;
*pInterpList = taosArrayInit(4, sizeof(SColumn)); pInfo->pInterpCols = taosArrayInit(4, sizeof(SColumn));
*pPrevCols = taosArrayInit(4, sizeof(SGroupKeys)); pInfo->pPrevValues = taosArrayInit(4, sizeof(SGroupKeys));
{ // ts column { // ts column
SColumn c = {0}; SColumn c = {0};
c.colId = 1; c.colId = 1;
c.slotId = 0; c.slotId = pInfo->primaryTsIndex;
c.type = TSDB_DATA_TYPE_TIMESTAMP; c.type = TSDB_DATA_TYPE_TIMESTAMP;
c.bytes = sizeof(int64_t); c.bytes = sizeof(int64_t);
taosArrayPush(*pInterpList, &c); taosArrayPush(pInfo->pInterpCols, &c);
SGroupKeys key = {0}; SGroupKeys key = {0};
key.bytes = c.bytes; key.bytes = c.bytes;
key.type = c.type; key.type = c.type;
key.isNull = false; key.isNull = true; // to denote no value is assigned yet
key.pData = taosMemoryCalloc(1, c.bytes); key.pData = taosMemoryCalloc(1, c.bytes);
taosArrayPush(*pPrevCols, &key); taosArrayPush(pInfo->pPrevValues, &key);
} }
for(int32_t i = 0; i < numOfCols; ++i) { for(int32_t i = 0; i < numOfCols; ++i) {
...@@ -1259,7 +1292,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SArr ...@@ -1259,7 +1292,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SArr
SFunctParam* pParam = &pExpr->base.pParam[0]; SFunctParam* pParam = &pExpr->base.pParam[0];
SColumn c = *pParam->pCol; SColumn c = *pParam->pCol;
taosArrayPush(*pInterpList, &c); taosArrayPush(pInfo->pInterpCols, &c);
needed = true; needed = true;
SGroupKeys key = {0}; SGroupKeys key = {0};
...@@ -1267,7 +1300,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SArr ...@@ -1267,7 +1300,7 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SArr
key.type = c.type; key.type = c.type;
key.isNull = false; key.isNull = false;
key.pData = taosMemoryCalloc(1, c.bytes); key.pData = taosMemoryCalloc(1, c.bytes);
taosArrayPush(*pPrevCols, &key); taosArrayPush(pInfo->pPrevValues, &key);
} }
} }
...@@ -1302,7 +1335,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -1302,7 +1335,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols); pInfo->invertible = allInvertible(pInfo->binfo.pCtx, numOfCols);
pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API pInfo->invertible = false; // Todo(liuyao): Dependent TSDB API
pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, &pInfo->pInterpCols, &pInfo->pPrevValues); pInfo->timeWindowInterpo = timeWindowinterpNeeded(pInfo->binfo.pCtx, numOfCols, pInfo);
if (pInfo->timeWindowInterpo) { if (pInfo->timeWindowInterpo) {
pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition)); pInfo->binfo.resultRowInfo.openWindow = tdListNew(sizeof(SResultRowPosition));
} }
...@@ -1780,7 +1813,7 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB ...@@ -1780,7 +1813,7 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
bool ascScan = true; bool ascScan = true;
TSKEY* tsCols = NULL; TSKEY* tsCols = NULL;
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t forwardStep = 0; int32_t forwardRows = 0;
if (pSDataBlock->pDataBlock != NULL) { if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
...@@ -1805,15 +1838,15 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB ...@@ -1805,15 +1838,15 @@ static SArray* doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataB
pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; pos->pos = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset};
*(int64_t*)pos->key = pResult->win.skey; *(int64_t*)pos->key = pResult->win.skey;
taosArrayPush(pUpdated, &pos); taosArrayPush(pUpdated, &pos);
forwardStep = forwardRows =
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, nextWin.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
// window start(end) key interpolation // window start(end) key interpolation
// disable it temporarily // disable it temporarily
// doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep); // doWindowBorderInterpolation(pInfo, pSDataBlock, numOfOutput, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows);
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardStep, tsCols, doApplyFunctions(pTaskInfo, pInfo->binfo.pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
int32_t prevEndPos = (forwardStep - 1) * step + startPos; int32_t prevEndPos = (forwardRows - 1) * step + startPos;
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order); startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
if (startPos < 0) { if (startPos < 0) {
break; break;
...@@ -2321,7 +2354,7 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator, ...@@ -2321,7 +2354,7 @@ static void doStreamSessionWindowAggImpl(SOperatorInfo* pOperator,
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
// window start(end) key interpolation // window start(end) key interpolation
// doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardStep, // doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->binfo.pCtx, pResult, &nextWin, startPos, forwardRows,
// pInfo->order, false); // pInfo->order, false);
int32_t winNum = getNumCompactWindow(pAggSup->pResultRows, winIndex, gap); int32_t winNum = getNumCompactWindow(pAggSup->pResultRows, winIndex, gap);
if (winNum > 0) { if (winNum > 0) {
......
...@@ -3946,7 +3946,6 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -3946,7 +3946,6 @@ int32_t uniqueFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
typedef struct STwaInfo { typedef struct STwaInfo {
double dOutput; double dOutput;
int8_t hasResult; // flag to denote has value
SPoint1 p; SPoint1 p;
STimeWindow win; STimeWindow win;
} STwaInfo; } STwaInfo;
...@@ -3977,28 +3976,25 @@ static double twa_get_area(SPoint1 s, SPoint1 e) { ...@@ -3977,28 +3976,25 @@ static double twa_get_area(SPoint1 s, SPoint1 e) {
return val; return val;
} }
#define INIT_INTP_POINT(_p, _k, _v) \
do { \
(_p).key = (_k); \
(_p).val = (_v); \
} while (0)
int32_t twaFunction(SqlFunctionCtx* pCtx) { int32_t twaFunction(SqlFunctionCtx* pCtx) {
SInputColumnInfoData* pInput = &pCtx->input; SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0]; SColumnInfoData* pInputCol = pInput->pData[0];
int32_t numOfElems = 0;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData; TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo); STwaInfo *pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SPoint1 *last = &pInfo->p;
int32_t numOfElems = 0;
SPoint1* last = &pInfo->p;
int32_t step = 1;//GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = pInput->startRowIndex; int32_t i = pInput->startRowIndex;
for (i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += step) {
if (!colDataIsNull_f(pInputCol->nullbitmap, i)) {
break;
}
}
if (pCtx->start.key != INT64_MIN) { if (pCtx->start.key != INT64_MIN) {
ASSERT((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) || ASSERT((pCtx->start.key < tsList[i] && pCtx->order == TSDB_ORDER_ASC) ||
(pCtx->start.key > tsList[i] && pCtx->order == TSDB_ORDER_DESC)); (pCtx->start.key > tsList[i] && pCtx->order == TSDB_ORDER_DESC));
...@@ -4009,37 +4005,30 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4009,37 +4005,30 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i));
pInfo->dOutput += twa_get_area(pCtx->start, *last); pInfo->dOutput += twa_get_area(pCtx->start, *last);
pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = pCtx->start.key; pInfo->win.skey = pCtx->start.key;
numOfElems++; numOfElems++;
i += step; i += 1;
} else if (pInfo->p.key == INT64_MIN) { } else if (pInfo->p.key == INT64_MIN) {
last->key = tsList[i]; last->key = tsList[i];
GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i)); GET_TYPED_DATA(last->val, double, pInputCol->info.type, colDataGetData(pInputCol, i));
pInfo->hasResult = DATA_SET_FLAG;
pInfo->win.skey = last->key; pInfo->win.skey = last->key;
numOfElems++; numOfElems++;
i += step; i += 1;
} }
SPoint1 st = {0};
// calculate the value of // calculate the value of
switch(pInputCol->info.type) { switch(pInputCol->info.type) {
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
int8_t *val = (int8_t*) colDataGetData(pInputCol, 0); int8_t *val = (int8_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 INIT_INTP_POINT(st, tsList[i], val[i]);
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
...@@ -4048,18 +4037,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4048,18 +4037,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
case TSDB_DATA_TYPE_SMALLINT: { case TSDB_DATA_TYPE_SMALLINT: {
int16_t *val = (int16_t*) colDataGetData(pInputCol, 0); int16_t *val = (int16_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 INIT_INTP_POINT(st, tsList[i], val[i]);
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
...@@ -4067,18 +4050,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4067,18 +4050,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
} }
case TSDB_DATA_TYPE_INT: { case TSDB_DATA_TYPE_INT: {
int32_t *val = (int32_t*) colDataGetData(pInputCol, 0); int32_t *val = (int32_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 INIT_INTP_POINT(st, tsList[i], val[i]);
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
...@@ -4086,18 +4063,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4086,18 +4063,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
} }
case TSDB_DATA_TYPE_BIGINT: { case TSDB_DATA_TYPE_BIGINT: {
int64_t *val = (int64_t*) colDataGetData(pInputCol, 0); int64_t *val = (int64_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 INIT_INTP_POINT(st, tsList[i], val[i]);
SPoint1 st = {.key = tsList[i], .val = (double) val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = (double)val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
...@@ -4105,18 +4076,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4105,18 +4076,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
} }
case TSDB_DATA_TYPE_FLOAT: { case TSDB_DATA_TYPE_FLOAT: {
float *val = (float*) colDataGetData(pInputCol, 0); float *val = (float*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 INIT_INTP_POINT(st, tsList[i], val[i]);
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = (double)val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
...@@ -4124,18 +4089,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4124,18 +4089,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
} }
case TSDB_DATA_TYPE_DOUBLE: { case TSDB_DATA_TYPE_DOUBLE: {
double *val = (double*) colDataGetData(pInputCol, 0); double *val = (double*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 INIT_INTP_POINT(st, tsList[i], val[i]);
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
...@@ -4143,18 +4102,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4143,18 +4102,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
} }
case TSDB_DATA_TYPE_UTINYINT: { case TSDB_DATA_TYPE_UTINYINT: {
uint8_t *val = (uint8_t*) colDataGetData(pInputCol, 0); uint8_t *val = (uint8_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 INIT_INTP_POINT(st, tsList[i], val[i]);
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
...@@ -4162,18 +4115,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4162,18 +4115,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
} }
case TSDB_DATA_TYPE_USMALLINT: { case TSDB_DATA_TYPE_USMALLINT: {
uint16_t *val = (uint16_t*) colDataGetData(pInputCol, 0); uint16_t *val = (uint16_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 INIT_INTP_POINT(st, tsList[i], val[i]);
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
...@@ -4181,18 +4128,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4181,18 +4128,12 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
} }
case TSDB_DATA_TYPE_UINT: { case TSDB_DATA_TYPE_UINT: {
uint32_t *val = (uint32_t*) colDataGetData(pInputCol, 0); uint32_t *val = (uint32_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 INIT_INTP_POINT(st, tsList[i], val[i]);
SPoint1 st = {.key = tsList[i], .val = val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
...@@ -4200,25 +4141,19 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4200,25 +4141,19 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
} }
case TSDB_DATA_TYPE_UBIGINT: { case TSDB_DATA_TYPE_UBIGINT: {
uint64_t *val = (uint64_t*) colDataGetData(pInputCol, 0); uint64_t *val = (uint64_t*) colDataGetData(pInputCol, 0);
for (; i < pInput->numOfRows + pInput->startRowIndex; i += step) { for (; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
continue; continue;
} }
#ifndef _TD_NINGSI_60 INIT_INTP_POINT(st, tsList[i], val[i]);
SPoint1 st = {.key = tsList[i], .val = (double) val[i]};
#else
SPoint1 st;
st.key = tsList[i];
st.val = (double) val[i];
#endif
pInfo->dOutput += twa_get_area(pInfo->p, st); pInfo->dOutput += twa_get_area(pInfo->p, st);
pInfo->p = st; pInfo->p = st;
} }
break; break;
} }
default: assert(0); default: ASSERT(0);
} }
// the last interpolated time window value // the last interpolated time window value
...@@ -4229,7 +4164,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) { ...@@ -4229,7 +4164,7 @@ int32_t twaFunction(SqlFunctionCtx* pCtx) {
pInfo->win.ekey = pInfo->p.key; pInfo->win.ekey = pInfo->p.key;
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); SET_VAL(pResInfo, numOfElems, 1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -4250,7 +4185,7 @@ int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { ...@@ -4250,7 +4185,7 @@ int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo); STwaInfo *pInfo = (STwaInfo *)GET_ROWCELL_INTERBUF(pResInfo);
if (pInfo->hasResult != DATA_SET_FLAG) { if (pResInfo->numOfRes == 0) {
pResInfo->isNullRes = 1; pResInfo->isNullRes = 1;
} else { } else {
// assert(pInfo->win.ekey == pInfo->p.key && pInfo->hasResult == pResInfo->hasResult); // assert(pInfo->win.ekey == pInfo->p.key && pInfo->hasResult == pResInfo->hasResult);
......
...@@ -36,12 +36,7 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) { ...@@ -36,12 +36,7 @@ void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
if (pNode->nodeType == TEXPR_BINARYEXPR_NODE || pNode->nodeType == TEXPR_UNARYEXPR_NODE) { if (pNode->nodeType == TEXPR_BINARYEXPR_NODE || pNode->nodeType == TEXPR_UNARYEXPR_NODE) {
doExprTreeDestroy(&pNode, fp); doExprTreeDestroy(&pNode, fp);
} else if (pNode->nodeType == TEXPR_VALUE_NODE) {
taosVariantDestroy(pNode->pVal);
} else if (pNode->nodeType == TEXPR_COL_NODE) {
taosMemoryFreeClear(pNode->pSchema);
} }
taosMemoryFree(pNode); taosMemoryFree(pNode);
} }
...@@ -49,15 +44,6 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) { ...@@ -49,15 +44,6 @@ static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
if (*pExpr == NULL) { if (*pExpr == NULL) {
return; return;
} }
int32_t type = (*pExpr)->nodeType;
if (type == TEXPR_VALUE_NODE) {
taosVariantDestroy((*pExpr)->pVal);
taosMemoryFree((*pExpr)->pVal);
} else if (type == TEXPR_COL_NODE) {
taosMemoryFree((*pExpr)->pSchema);
}
taosMemoryFree(*pExpr); taosMemoryFree(*pExpr);
*pExpr = NULL; *pExpr = NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册