提交 343988be 编写于 作者: H Haojun Liao

refactor: do some internal refactor.

上级 1f92000c
...@@ -295,19 +295,19 @@ typedef struct SPoint { ...@@ -295,19 +295,19 @@ typedef struct SPoint {
void * val; void * val;
} SPoint; } SPoint;
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey); //void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp); //void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput); //void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const int64_t* fillVal); //struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const SValueNode* val);
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo); //bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
//
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, //struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, // SInterval* pInterval, int32_t fillType,
struct SFillColInfo* pFillCol, const char* id); // struct SFillColInfo* pCol, const char* id);
//
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo); //void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity); //int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
int64_t getFillInfoStart(struct SFillInfo *pFillInfo); //int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType); int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
......
...@@ -427,6 +427,7 @@ typedef struct STableIntervalOperatorInfo { ...@@ -427,6 +427,7 @@ typedef struct STableIntervalOperatorInfo {
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model] EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator. SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
STimeWindowAggSupp twAggSup; STimeWindowAggSupp twAggSup;
struct SFillInfo* pFillInfo; // fill info
} STableIntervalOperatorInfo; } STableIntervalOperatorInfo;
typedef struct SAggOperatorInfo { typedef struct SAggOperatorInfo {
...@@ -467,7 +468,6 @@ typedef struct SFillOperatorInfo { ...@@ -467,7 +468,6 @@ typedef struct SFillOperatorInfo {
SSDataBlock* existNewGroupBlock; SSDataBlock* existNewGroupBlock;
bool multigroupResult; bool multigroupResult;
SInterval intervalInfo; SInterval intervalInfo;
int32_t capacity;
} SFillOperatorInfo; } SFillOperatorInfo;
typedef struct { typedef struct {
...@@ -609,7 +609,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t ...@@ -609,7 +609,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey); SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
SDiskbasedBuf* pBuf, int32_t* rowCellOffset); SDiskbasedBuf* pBuf, int32_t* rowCellOffset);
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
...@@ -621,7 +621,7 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput); ...@@ -621,7 +621,7 @@ void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput);
int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData, int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadInfo, int32_t numOfRows, char* pData,
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total, int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
SArray* pColList); SArray* pColList);
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow* win); void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win);
void doSetOperatorCompleted(SOperatorInfo* pOperator); void doSetOperatorCompleted(SOperatorInfo* pOperator);
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock); void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
...@@ -645,8 +645,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB ...@@ -645,8 +645,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId);
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo); SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
......
...@@ -22,15 +22,18 @@ extern "C" { ...@@ -22,15 +22,18 @@ extern "C" {
#include "os.h" #include "os.h"
#include "taosdef.h" #include "taosdef.h"
#include "tcommon.h"
struct SSDataBlock; struct SSDataBlock;
typedef struct SFillColInfo { typedef struct SFillColInfo {
STColumn col; // column info // STColumn col; // column info
SResSchema col;
int16_t functionId; // sql function id int16_t functionId; // sql function id
int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN
int16_t tagIndex; // index of current tag in SFillTagColInfo array list int16_t tagIndex; // index of current tag in SFillTagColInfo array list
union {int64_t i; double d;} fillVal; int32_t offset;
union {int64_t i; double d;} val;
} SFillColInfo; } SFillColInfo;
typedef struct { typedef struct {
...@@ -57,7 +60,6 @@ typedef struct SFillInfo { ...@@ -57,7 +60,6 @@ typedef struct SFillInfo {
char * nextValues; // next row of data char * nextValues; // next row of data
char** pData; // original result data block involved in filling data char** pData; // original result data block involved in filling data
int32_t alloc; // data buffer size in rows int32_t alloc; // data buffer size in rows
int8_t precision; // time resoluation
SFillColInfo* pFillCol; // column info for fill operations SFillColInfo* pFillCol; // column info for fill operations
SFillTagColInfo* pTags; // tags value for filling gap SFillTagColInfo* pTags; // tags value for filling gap
...@@ -67,7 +69,19 @@ typedef struct SFillInfo { ...@@ -67,7 +69,19 @@ typedef struct SFillInfo {
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows); int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
void taosFillSetStartInfo(struct SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey);
void taosResetFillInfo(struct SFillInfo* pFillInfo, TSKEY startTimestamp);
void taosFillSetInputDataBlock(struct SFillInfo* pFillInfo, const struct SSDataBlock* pInput);
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val);
bool taosFillHasMoreResults(struct SFillInfo* pFillInfo);
SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
SInterval* pInterval, int32_t fillType,
struct SFillColInfo* pCol, const char* id);
void* taosDestroyFillInfo(struct SFillInfo *pFillInfo);
int64_t taosFillResultDataBlock(struct SFillInfo* pFillInfo, void** output, int32_t capacity);
int64_t getFillInfoStart(struct SFillInfo *pFillInfo);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -14,11 +14,12 @@ ...@@ -14,11 +14,12 @@
*/ */
#include "filter.h" #include "filter.h"
#include "functionMgt.h"
#include "function.h" #include "function.h"
#include "functionMgt.h"
#include "os.h"
#include "querynodes.h" #include "querynodes.h"
#include "tname.h" #include "tname.h"
#include "os.h" #include "tfill.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tglobal.h" #include "tglobal.h"
...@@ -514,13 +515,12 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR ...@@ -514,13 +515,12 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR
return pResult; return pResult;
} }
static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w, TSKEY ekey, static void getInitialStartTimeWindow(SInterval* pInterval, int32_t precision, TSKEY ts, STimeWindow* w, bool ascQuery) {
bool ascQuery) {
if (ascQuery) { if (ascQuery) {
getAlignQueryTimeWindow(pInterval, precision, ts, ts, ekey, w); getAlignQueryTimeWindow(pInterval, precision, ts, w);
} else { } else {
// the start position of the first time window in the endpoint that spreads beyond the queried last timestamp // the start position of the first time window in the endpoint that spreads beyond the queried last timestamp
getAlignQueryTimeWindow(pInterval, precision, ts, ekey, ts, w); getAlignQueryTimeWindow(pInterval, precision, ts, w);
int64_t key = w->skey; int64_t key = w->skey;
while (key < ts) { // moving towards end while (key < ts) { // moving towards end
...@@ -540,7 +540,7 @@ static STimeWindow getActiveTimeWindow(SDiskbasedBuf * pBuf, SResultRowInfo* pRe ...@@ -540,7 +540,7 @@ static STimeWindow getActiveTimeWindow(SDiskbasedBuf * pBuf, SResultRowInfo* pRe
STimeWindow w = {0}; STimeWindow w = {0};
if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value if (pResultRowInfo->cur.pageId == -1) { // the first window, from the previous stored value
getInitialStartTimeWindow(pInterval, precision, ts, &w, win->ekey, true); getInitialStartTimeWindow(pInterval, precision, ts, &w, true);
w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1; w.ekey = taosTimeAdd(w.skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} else { } else {
w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win; w = getResultRowByPos(pBuf, &pResultRowInfo->cur)->win;
...@@ -2015,20 +2015,16 @@ static bool isCachedLastQuery(STaskAttr* pQueryAttr) { ...@@ -2015,20 +2015,16 @@ static bool isCachedLastQuery(STaskAttr* pQueryAttr) {
///////////////////////////////////////////////////////////////////////////////////////////// /////////////////////////////////////////////////////////////////////////////////////////////
// todo refactor : return window // todo refactor : return window
void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, int64_t keyFirst, int64_t keyLast, void getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key, STimeWindow* win) {
STimeWindow* win) {
ASSERT(key >= keyFirst && key <= keyLast);
win->skey = taosTimeTruncate(key, pInterval, precision); win->skey = taosTimeTruncate(key, pInterval, precision);
/* /*
* if the realSkey > INT64_MAX - pInterval->interval, the query duration between * if the realSkey > INT64_MAX - pInterval->interval, the query duration between
* realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges. * realSkey and realEkey must be less than one interval.Therefore, no need to adjust the query ranges.
*/ */
if (keyFirst > (INT64_MAX - pInterval->interval)) { win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
assert(keyLast - keyFirst < pInterval->interval); if (win->ekey < win->skey) {
win->ekey = INT64_MAX; win->ekey = INT64_MAX;
} else {
win->ekey = taosTimeAdd(win->skey, pInterval->interval, pInterval->intervalUnit, precision) - 1;
} }
} }
...@@ -3176,10 +3172,12 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo* ...@@ -3176,10 +3172,12 @@ int32_t doCopyToSDataBlock(SSDataBlock* pBlock, int32_t rowCapacity, SExprInfo*
// qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv)); // qDebug("QInfo:0x%"PRIx64" copy data to query buf completed", GET_TASKID(pRuntimeEnv));
pBlock->info.rows = numOfResult; pBlock->info.rows = numOfResult;
blockDataUpdateTsWindow(pBlock);
return 0; return 0;
} }
void toSDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, void doBuildResultDatablock(SSDataBlock* pBlock, int32_t rowCapacity, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf,
int32_t* rowCellOffset) { int32_t* rowCellOffset) {
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
...@@ -4813,7 +4811,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) ...@@ -4813,7 +4811,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup)
} }
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
toSDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset); doBuildResultDatablock(pInfo->pRes, pOperator->resultInfo.capacity, &pAggInfo->groupResInfo, pOperator->pExpr, pAggInfo->aggSup.pResultBuf, pInfo->rowCellInfoOffset);
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -5126,6 +5124,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro ...@@ -5126,6 +5124,8 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
return NULL; return NULL;
} }
SSDataBlock* pBlock = pInfo->binfo.pRes;
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
return pOperator->getStreamResFn(pOperator, newgroup); return pOperator->getStreamResFn(pOperator, newgroup);
} else { } else {
...@@ -5134,15 +5134,15 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro ...@@ -5134,15 +5134,15 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
return NULL; return NULL;
} }
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, doBuildResultDatablock(pBlock, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; return pBlock->info.rows == 0 ? NULL : pBlock;
} }
} }
...@@ -5155,7 +5155,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup ...@@ -5155,7 +5155,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
} }
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
...@@ -5190,7 +5190,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup ...@@ -5190,7 +5190,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator, bool* newgroup
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
ASSERT(pInfo->binfo.pRes->info.rows > 0); ASSERT(pInfo->binfo.pRes->info.rows > 0);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
...@@ -5205,7 +5205,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -5205,7 +5205,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
STimeSliceOperatorInfo* pSliceInfo = pOperator->info; STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -5238,7 +5238,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { ...@@ -5238,7 +5238,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput); finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfOutput);
initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); initGroupResInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
...@@ -5292,7 +5292,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup ...@@ -5292,7 +5292,7 @@ static SSDataBlock* doSTableIntervalAgg(SOperatorInfo* pOperator, bool* newgroup
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
toSDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, doBuildResultDatablock(pInfo->binfo.pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr,
pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
...@@ -5383,7 +5383,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -5383,7 +5383,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
...@@ -5415,7 +5415,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -5415,7 +5415,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) {
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -5432,7 +5432,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) ...@@ -5432,7 +5432,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
return NULL; return NULL;
...@@ -5464,7 +5464,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) ...@@ -5464,7 +5464,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup)
initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo); initGroupResInfo(&pInfo->groupResInfo, &pBInfo->resultRowInfo);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
toSDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset); doBuildResultDatablock(pBInfo->pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pBInfo->rowCellInfoOffset);
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
doSetOperatorCompleted(pOperator); doSetOperatorCompleted(pOperator);
} }
...@@ -5509,14 +5509,16 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -5509,14 +5509,16 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SResultInfo* pResultInfo = &pOperator->resultInfo; SResultInfo* pResultInfo = &pOperator->resultInfo;
blockDataCleanup(pInfo->pRes); SSDataBlock* pResBlock = pInfo->pRes;
blockDataCleanup(pResBlock);
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo); doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pInfo->pRes->info.rows > 0)) { if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) {
return pInfo->pRes; return pResBlock;
} }
SOperatorInfo* pDownstream = pOperator->pDownstream[0]; SOperatorInfo* pDownstream = pOperator->pDownstream[0];
...@@ -5551,25 +5553,25 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -5551,25 +5553,25 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) {
} }
} }
doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pInfo->capacity, pInfo->p); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pResBlock, pOperator->resultInfo.capacity, pInfo->p);
// current group has no more result to return // current group has no more result to return
if (pInfo->pRes->info.rows > 0) { if (pResBlock->info.rows > 0) {
// 1. The result in current group not reach the threshold of output result, continue // 1. The result in current group not reach the threshold of output result, continue
// 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
if (pInfo->pRes->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) { if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) {
return pInfo->pRes; return pResBlock;
} }
doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo); doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo);
if (pInfo->pRes->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) { if (pResBlock->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) {
return pInfo->pRes; return pResBlock;
} }
} else if (pInfo->existNewGroupBlock) { // try next group } else if (pInfo->existNewGroupBlock) { // try next group
assert(pBlock != NULL); assert(pBlock != NULL);
doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo); doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo);
if (pInfo->pRes->info.rows > pResultInfo->threshold) { if (pResBlock->info.rows > pResultInfo->threshold) {
return pInfo->pRes; return pResBlock;
} }
} else { } else {
return NULL; return NULL;
...@@ -5878,8 +5880,7 @@ _error: ...@@ -5878,8 +5880,7 @@ _error:
SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId, SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
...@@ -5890,10 +5891,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -5890,10 +5891,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->execModel = pTaskInfo->execModel; pInfo->execModel = pTaskInfo->execModel;
pInfo->win = pTaskInfo->window; pInfo->win = pTaskInfo->window;
pInfo->win.skey = 0;
pInfo->win.ekey = INT64_MAX;
pInfo->primaryTsIndex = primaryTsSlotId;
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
pInfo->primaryTsIndex = primaryTsSlotId;
int32_t numOfRows = 4096; int32_t numOfRows = 4096;
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
...@@ -6063,18 +6062,14 @@ _error: ...@@ -6063,18 +6062,14 @@ _error:
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal, static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, int64_t* fillVal,
STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) { STimeWindow win, int32_t capacity, const char* id, SInterval* pInterval, int32_t fillType) {
struct SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, (int64_t*)fillVal); SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, NULL);
TSKEY sk = TMIN(win.skey, win.ekey);
TSKEY ek = TMAX(win.skey, win.ekey);
// TODO set correct time precision // TODO set correct time precision
STimeWindow w = TSWINDOW_INITIALIZER; STimeWindow w = TSWINDOW_INITIALIZER;
getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, sk, ek, &w); getAlignQueryTimeWindow(pInterval, TSDB_TIME_PRECISION_MILLI, win.skey, &w);
int32_t order = TSDB_ORDER_ASC; int32_t order = TSDB_ORDER_ASC;
pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval->sliding, pInfo->pFillInfo = taosCreateFillInfo(order, w.skey, 0, capacity, numOfCols, pInterval, fillType, pColInfo, id);
pInterval->slidingUnit, (int8_t)pInterval->precision, fillType, pColInfo, id);
pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
...@@ -6095,23 +6090,37 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp ...@@ -6095,23 +6090,37 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
pInfo->multigroupResult = multigroupResult; pInfo->multigroupResult = multigroupResult;
pInfo->intervalInfo = *pInterval; pInfo->intervalInfo = *pInterval;
int32_t type = TSDB_FILL_NONE;
switch (fillType) {
case FILL_MODE_PREV: type = TSDB_FILL_PREV;break;
case FILL_MODE_NONE: type = TSDB_FILL_NONE;break;
case FILL_MODE_NULL: type = TSDB_FILL_NULL;break;
case FILL_MODE_NEXT: type = TSDB_FILL_NEXT;break;
case FILL_MODE_VALUE: type = TSDB_FILL_SET_VALUE;break;
case FILL_MODE_LINEAR: type = TSDB_FILL_LINEAR;break;
default:
type = TSDB_FILL_NONE;
}
SResultInfo* pResultInfo = &pOperator->resultInfo; SResultInfo* pResultInfo = &pOperator->resultInfo;
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity, initResultSizeInfo(pOperator, 4096);
pTaskInfo->id.str, pInterval, fillType);
int32_t code = initFillInfo(pInfo, pExpr, numOfCols, (int64_t*)fillVal, pTaskInfo->window, pResultInfo->capacity,
pTaskInfo->id.str, pInterval, type);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
goto _error; goto _error;
} }
pOperator->name = "FillOperator"; pOperator->name = "FillOperator";
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
// pOperator->operatorType = OP_Fill; // pOperator->operatorType = OP_Fill;
pOperator->pExpr = pExpr; pOperator->pExpr = pExpr;
pOperator->numOfOutput = numOfCols; pOperator->numOfOutput = numOfCols;
pOperator->info = pInfo; pOperator->info = pInfo;
pOperator->_openFn = operatorDummyOpenFn; pOperator->_openFn = operatorDummyOpenFn;
pOperator->getNextFn = doFill; pOperator->getNextFn = doFill;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->closeFn = destroySFillOperatorInfo; pOperator->closeFn = destroySFillOperatorInfo;
...@@ -6582,6 +6591,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6582,6 +6591,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId; int32_t primaryTsSlotId = ((SColumnNode*) pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo); pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, primaryTsSlotId, &as, pTableGroupInfo, pTaskInfo);
if (pIntervalPhyNode->pFill != NULL) {
pOptr = createFillOperatorInfo(pOptr, pExprInfo, num, &interval, pResBlock, pIntervalPhyNode->pFill->mode, NULL, false, pTaskInfo);
}
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode;
......
...@@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou ...@@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
SSDataBlock* pRes = pInfo->binfo.pRes; SSDataBlock* pRes = pInfo->binfo.pRes;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
...@@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou ...@@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
while(1) { while(1) {
toSDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset); doBuildResultDatablock(pRes, pOperator->resultInfo.capacity, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset);
doFilter(pInfo->pCondition, pRes); doFilter(pInfo->pCondition, pRes);
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
......
...@@ -129,7 +129,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn ...@@ -129,7 +129,7 @@ static bool overlapWithTimeWindow(SInterval* pInterval, SDataBlockInfo* pBlockIn
// TSKEY ek = MAX(pQueryAttr->window.skey, pQueryAttr->window.ekey); // TSKEY ek = MAX(pQueryAttr->window.skey, pQueryAttr->window.ekey);
if (true) { if (true) {
getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, sk, ek, &w); getAlignQueryTimeWindow(pInterval, pInterval->precision, pBlockInfo->window.skey, &w);
assert(w.ekey >= pBlockInfo->window.skey); assert(w.ekey >= pBlockInfo->window.skey);
if (w.ekey < pBlockInfo->window.ekey) { if (w.ekey < pBlockInfo->window.ekey) {
......
...@@ -13,17 +13,18 @@ ...@@ -13,17 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <function.h> #include "function.h"
#include "os.h" #include "os.h"
#include "querynodes.h"
#include "taosdef.h" #include "taosdef.h"
#include "tmsg.h" #include "tmsg.h"
#include "ttypes.h" #include "ttypes.h"
#include "tfill.h" #include "tfill.h"
#include "thash.h"
#include "function.h" #include "function.h"
#include "tcommon.h" #include "tcommon.h"
#include "thash.h"
#include "ttime.h" #include "ttime.h"
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC) #define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
...@@ -41,7 +42,7 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) { ...@@ -41,7 +42,7 @@ static void setTagsValue(SFillInfo* pFillInfo, void** data, int32_t genRows) {
assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags); assert(pCol->tagIndex >= 0 && pCol->tagIndex < pFillInfo->numOfTags);
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
assert (pTag->col.colId == pCol->col.colId); // assert (pTag->col.colId == pCol->col.colId);
assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type); assignVal(val1, pTag->tagVal, pCol->col.bytes, pCol->col.type);
} }
} }
...@@ -80,7 +81,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData ...@@ -80,7 +81,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
} }
char* output = elePtrAt(data[i], pCol->col.bytes, index); char* output = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type); // assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type);
} }
} else { // no prev value yet, set the value for NULL } else { // no prev value yet, set the value for NULL
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
...@@ -96,7 +97,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData ...@@ -96,7 +97,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
} }
char* output = elePtrAt(data[i], pCol->col.bytes, index); char* output = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(output, p + pCol->col.offset, pCol->col.bytes, pCol->col.type); // assignVal(output, p + pCol->offset, pCol->col.bytes, pCol->col.type);
} }
} else { // no prev value yet, set the value for NULL } else { // no prev value yet, set the value for NULL
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index); setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
...@@ -119,7 +120,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData ...@@ -119,7 +120,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
continue; continue;
} }
point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset}; point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->offset};
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes}; point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes};
point = (SPoint){.key = pFillInfo->currentKey, .val = val1}; point = (SPoint){.key = pFillInfo->currentKey, .val = val1};
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type); taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
...@@ -135,12 +136,13 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData ...@@ -135,12 +136,13 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, void** data, char** srcData
} }
char* val1 = elePtrAt(data[i], pCol->col.bytes, index); char* val1 = elePtrAt(data[i], pCol->col.bytes, index);
assignVal(val1, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); assignVal(val1, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
} }
} }
setTagsValue(pFillInfo, data, index); setTagsValue(pFillInfo, data, index);
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit, pFillInfo->precision); pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->interval.slidingUnit,
pFillInfo->interval.precision);
pFillInfo->numOfCurrent++; pFillInfo->numOfCurrent++;
} }
...@@ -152,7 +154,7 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) { ...@@ -152,7 +154,7 @@ static void initBeforeAfterDataBuf(SFillInfo* pFillInfo, char** next) {
*next = taosMemoryCalloc(1, pFillInfo->rowSize); *next = taosMemoryCalloc(1, pFillInfo->rowSize);
for (int i = 1; i < pFillInfo->numOfCols; i++) { for (int i = 1; i < pFillInfo->numOfCols; i++) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
setNull(*next + pCol->col.offset, pCol->col.type, pCol->col.bytes); setNull(*next + pCol->offset, pCol->col.type, pCol->col.bytes);
} }
} }
...@@ -160,7 +162,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* bu ...@@ -160,7 +162,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, char** srcData, char* bu
int32_t rowIndex = pFillInfo->index; int32_t rowIndex = pFillInfo->index;
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) { for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
SFillColInfo* pCol = &pFillInfo->pFillCol[i]; SFillColInfo* pCol = &pFillInfo->pFillCol[i];
memcpy(buf + pCol->col.offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes); memcpy(buf + pCol->offset, srcData[i] + rowIndex * pCol->col.bytes, pCol->col.bytes);
} }
} }
...@@ -227,21 +229,21 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR ...@@ -227,21 +229,21 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
if (i == 0 || (pCol->functionId != FUNCTION_COUNT && !isNull(src, pCol->col.type)) || if (i == 0 || (pCol->functionId != FUNCTION_COUNT && !isNull(src, pCol->col.type)) ||
(pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)) { (pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)) {
assignVal(output, src, pCol->col.bytes, pCol->col.type); assignVal(output, src, pCol->col.bytes, pCol->col.type);
memcpy(*prev + pCol->col.offset, src, pCol->col.bytes); memcpy(*prev + pCol->offset, src, pCol->col.bytes);
} else { // i > 0 and data is null , do interpolation } else { // i > 0 and data is null , do interpolation
if (pFillInfo->type == TSDB_FILL_PREV) { if (pFillInfo->type == TSDB_FILL_PREV) {
assignVal(output, *prev + pCol->col.offset, pCol->col.bytes, pCol->col.type); assignVal(output, *prev + pCol->offset, pCol->col.bytes, pCol->col.type);
} else if (pFillInfo->type == TSDB_FILL_LINEAR) { } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
assignVal(output, src, pCol->col.bytes, pCol->col.type); assignVal(output, src, pCol->col.bytes, pCol->col.type);
memcpy(*prev + pCol->col.offset, src, pCol->col.bytes); memcpy(*prev + pCol->offset, src, pCol->col.bytes);
} else if (pFillInfo->type == TSDB_FILL_NEXT) { } else if (pFillInfo->type == TSDB_FILL_NEXT) {
if (*next) { if (*next) {
assignVal(output, *next + pCol->col.offset, pCol->col.bytes, pCol->col.type); assignVal(output, *next + pCol->offset, pCol->col.bytes, pCol->col.type);
} else { } else {
setNull(output, pCol->col.type, pCol->col.bytes); setNull(output, pCol->col.type, pCol->col.bytes);
} }
} else { } else {
assignVal(output, (char*)&pCol->fillVal.i, pCol->col.bytes, pCol->col.type); assignVal(output, (char*)&pCol->val, pCol->col.bytes, pCol->col.type);
} }
} }
} }
...@@ -250,7 +252,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR ...@@ -250,7 +252,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, void** data, int32_t outputR
setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent); setTagsValue(pFillInfo, data, pFillInfo->numOfCurrent);
pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step, pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pFillInfo->interval.sliding * step,
pFillInfo->interval.slidingUnit, pFillInfo->precision); pFillInfo->interval.slidingUnit, pFillInfo->interval.precision);
pFillInfo->index += 1; pFillInfo->index += 1;
pFillInfo->numOfCurrent += 1; pFillInfo->numOfCurrent += 1;
} }
...@@ -301,7 +303,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t ...@@ -301,7 +303,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
bool exists = false; bool exists = false;
int32_t index = -1; int32_t index = -1;
for (int32_t j = 0; j < k; ++j) { for (int32_t j = 0; j < k; ++j) {
if (pFillInfo->pTags[j].col.colId == pColInfo->col.colId) { if (pFillInfo->pTags[j].col.colId == pColInfo->col.slotId) {
exists = true; exists = true;
index = j; index = j;
break; break;
...@@ -310,7 +312,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t ...@@ -310,7 +312,7 @@ static int32_t setTagColumnInfo(SFillInfo* pFillInfo, int32_t numOfCols, int32_t
if (!exists) { if (!exists) {
SSchema* pSchema = &pFillInfo->pTags[k].col; SSchema* pSchema = &pFillInfo->pTags[k].col;
pSchema->colId = pColInfo->col.colId; pSchema->colId = pColInfo->col.slotId;
pSchema->type = pColInfo->col.type; pSchema->type = pColInfo->col.type;
pSchema->bytes = pColInfo->col.bytes; pSchema->bytes = pColInfo->col.bytes;
...@@ -341,30 +343,40 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) { ...@@ -341,30 +343,40 @@ static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
} }
struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols, struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTags, int32_t capacity, int32_t numOfCols,
int64_t slidingTime, int8_t slidingUnit, int8_t precision, int32_t fillType, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol, const char* id) {
struct SFillColInfo* pCol, const char* id) {
if (fillType == TSDB_FILL_NONE) { if (fillType == TSDB_FILL_NONE) {
return NULL; return NULL;
} }
SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo)); SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
if (pFillInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
taosResetFillInfo(pFillInfo, skey); taosResetFillInfo(pFillInfo, skey);
pFillInfo->order = order; pFillInfo->order = order;
switch(fillType) {
case FILL_MODE_NONE: pFillInfo->type = TSDB_FILL_NONE; break;
case FILL_MODE_PREV: pFillInfo->type = TSDB_FILL_PREV; break;
case FILL_MODE_NULL: pFillInfo->type = TSDB_FILL_NULL; break;
case FILL_MODE_LINEAR: pFillInfo->type = TSDB_FILL_LINEAR;break;
case FILL_MODE_NEXT: pFillInfo->type = TSDB_FILL_NEXT; break;
default:
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
pFillInfo->type = fillType; pFillInfo->type = fillType;
pFillInfo->pFillCol = pCol; pFillInfo->pFillCol = pCol;
pFillInfo->numOfTags = numOfTags; pFillInfo->numOfTags = numOfTags;
pFillInfo->numOfCols = numOfCols; pFillInfo->numOfCols = numOfCols;
pFillInfo->precision = precision;
pFillInfo->alloc = capacity; pFillInfo->alloc = capacity;
pFillInfo->id = id; pFillInfo->id = id;
pFillInfo->interval = *pInterval;
pFillInfo->interval.interval = slidingTime; pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols);
pFillInfo->interval.intervalUnit = slidingUnit;
pFillInfo->interval.sliding = slidingTime;
pFillInfo->interval.slidingUnit = slidingUnit;
pFillInfo->pData = taosMemoryMalloc(POINTER_BYTES * numOfCols);
// if (numOfTags > 0) { // if (numOfTags > 0) {
pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo)); pFillInfo->pTags = taosMemoryCalloc(numOfCols, sizeof(SFillTagColInfo));
...@@ -375,7 +387,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag ...@@ -375,7 +387,6 @@ struct SFillInfo* taosCreateFillInfo(int32_t order, TSKEY skey, int32_t numOfTag
pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc); pFillInfo->rowSize = setTagColumnInfo(pFillInfo, pFillInfo->numOfCols, pFillInfo->alloc);
assert(pFillInfo->rowSize > 0); assert(pFillInfo->rowSize > 0);
return pFillInfo; return pFillInfo;
} }
...@@ -417,7 +428,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) ...@@ -417,7 +428,7 @@ void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey)
pFillInfo->end = endKey; pFillInfo->end = endKey;
if (!FILL_IS_ASC_FILL(pFillInfo)) { if (!FILL_IS_ASC_FILL(pFillInfo)) {
pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->precision); pFillInfo->end = taosTimeTruncate(endKey, &pFillInfo->interval, pFillInfo->interval.precision);
} }
pFillInfo->index = 0; pFillInfo->index = 0;
...@@ -433,7 +444,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) ...@@ -433,7 +444,7 @@ void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput)
if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer if (TSDB_COL_IS_TAG(pCol->flag)) { // copy the tag value to tag value buffer
SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex]; SFillTagColInfo* pTag = &pFillInfo->pTags[pCol->tagIndex];
assert (pTag->col.colId == pCol->col.colId); assert (pTag->col.colId == pCol->col.slotId);
memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy?? memcpy(pTag->tagVal, pColData->pData, pCol->col.bytes); // TODO not memcpy??
} }
} }
...@@ -460,7 +471,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma ...@@ -460,7 +471,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
TSKEY ekey1 = ekey; TSKEY ekey1 = ekey;
if (!FILL_IS_ASC_FILL(pFillInfo)) { if (!FILL_IS_ASC_FILL(pFillInfo)) {
pFillInfo->end = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->precision); pFillInfo->end = taosTimeTruncate(ekey, &pFillInfo->interval, pFillInfo->interval.precision);
} }
int64_t numOfRes = -1; int64_t numOfRes = -1;
...@@ -471,7 +482,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma ...@@ -471,7 +482,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
pFillInfo->currentKey, pFillInfo->currentKey,
pFillInfo->interval.sliding, pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit, pFillInfo->interval.slidingUnit,
pFillInfo->precision); pFillInfo->interval.precision);
numOfRes += 1; numOfRes += 1;
assert(numOfRes >= numOfRows); assert(numOfRes >= numOfRows);
} else { // reach the end of data } else { // reach the end of data
...@@ -484,7 +495,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma ...@@ -484,7 +495,7 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
pFillInfo->currentKey, pFillInfo->currentKey,
pFillInfo->interval.sliding, pFillInfo->interval.sliding,
pFillInfo->interval.slidingUnit, pFillInfo->interval.slidingUnit,
pFillInfo->precision); pFillInfo->interval.precision);
numOfRes += 1; numOfRes += 1;
} }
...@@ -527,7 +538,7 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo) { ...@@ -527,7 +538,7 @@ int64_t getFillInfoStart(struct SFillInfo *pFillInfo) {
return pFillInfo->start; return pFillInfo->start;
} }
struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const int64_t* fillVal) { struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, const struct SValueNode* val) {
int32_t offset = 0; int32_t offset = 0;
struct SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo)); struct SFillColInfo* pFillCol = taosMemoryCalloc(numOfOutput, sizeof(SFillColInfo));
...@@ -538,14 +549,15 @@ struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, co ...@@ -538,14 +549,15 @@ struct SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, co
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &pExpr[i]; SExprInfo* pExprInfo = &pExpr[i];
pFillCol[i].col.bytes = pExprInfo->base.resSchema.bytes; pFillCol[i].col = pExprInfo->base.resSchema;
pFillCol[i].col.type = (int8_t)pExprInfo->base.resSchema.type; pFillCol[i].offset = offset;
pFillCol[i].col.offset = offset;
pFillCol[i].col.colId = pExprInfo->base.resSchema.slotId;
pFillCol[i].tagIndex = -2; pFillCol[i].tagIndex = -2;
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
if (pExprInfo->base.numOfParams > 0) {
pFillCol[i].flag = pExprInfo->base.pParam[0].pCol->flag; // always be the normal column for table query
}
// pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId; // pFillCol[i].functionId = pExprInfo->pExpr->_function.functionId;
pFillCol[i].fillVal.i = fillVal[i]; // pFillCol[i].val.d = *val;
offset += pExprInfo->base.resSchema.bytes; offset += pExprInfo->base.resSchema.bytes;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册