提交 901ff9bf 编写于 作者: 5 54liuyao

feat(sma):files factor

上级 63c4472c
......@@ -1439,8 +1439,10 @@ typedef struct {
int32_t code;
} STaskDropRsp;
#define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_AT_ONCE_SMA 0
#define STREAM_TRIGGER_AT_ONCE 1
#define STREAM_TRIGGER_WINDOW_CLOSE 2
#define STREAM_TRIGGER_WINDOW_CLOSE_SMA 3
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
......
......@@ -80,7 +80,7 @@ typedef struct SAlterDatabaseStmt {
typedef struct STableOptions {
ENodeType type;
char comment[TSDB_TB_COMMENT_LEN];
float filesFactor;
double filesFactor;
SNodeList* pRollupFuncs;
int32_t ttl;
SNodeList* pSma;
......
......@@ -59,6 +59,7 @@ typedef struct SScanLogicNode {
int8_t triggerType;
int64_t watermark;
int16_t tsColId;
double filesFactor;
} SScanLogicNode;
typedef struct SJoinLogicNode {
......@@ -113,6 +114,7 @@ typedef struct SWindowLogicNode {
SNode* pStateExpr;
int8_t triggerType;
int64_t watermark;
double filesFactor;
} SWindowLogicNode;
typedef struct SFillLogicNode {
......@@ -222,6 +224,7 @@ typedef struct STableScanPhysiNode {
int8_t triggerType;
int64_t watermark;
int16_t tsColId;
double filesFactor;
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;
......@@ -272,6 +275,7 @@ typedef struct SWinodwPhysiNode {
SNode* pTspk; // timestamp primary key
int8_t triggerType;
int64_t watermark;
double filesFactor;
} SWinodwPhysiNode;
typedef struct SIntervalPhysiNode {
......
......@@ -36,6 +36,7 @@ typedef struct SPlanContext {
int64_t watermark;
char* pMsg;
int32_t msgLen;
double filesFactor;
} SPlanContext;
// Create the physical plan for the query, according to the AST.
......
......@@ -30,7 +30,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr,
int32_t* pLen);
int32_t* pLen, double filesFactor);
#ifdef __cplusplus
}
......
......@@ -36,7 +36,7 @@
extern bool tsStreamSchedV;
int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int64_t watermark, char** pStr,
int32_t* pLen) {
int32_t* pLen, double filesFactor) {
SNode* pAst = NULL;
SQueryPlan* pPlan = NULL;
terrno = TSDB_CODE_SUCCESS;
......@@ -58,6 +58,7 @@ int32_t mndConvertRSmaTask(const char* ast, int64_t uid, int8_t triggerType, int
.rSmaQuery = true,
.triggerType = triggerType,
.watermark = watermark,
.filesFactor = filesFactor,
};
if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
......
......@@ -397,13 +397,13 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt
req.pRSmaParam.xFilesFactor = pStb->xFilesFactor;
req.pRSmaParam.delay = pStb->delay;
if (pStb->ast1Len > 0) {
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len) !=
if (mndConvertRSmaTask(pStb->pAst1, pStb->uid, 0, 0, &req.pRSmaParam.qmsg1, &req.pRSmaParam.qmsg1Len, req.pRSmaParam.xFilesFactor) !=
TSDB_CODE_SUCCESS) {
return NULL;
}
}
if (pStb->ast2Len > 0) {
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len) !=
if (mndConvertRSmaTask(pStb->pAst2, pStb->uid, 0, 0, &req.pRSmaParam.qmsg2, &req.pRSmaParam.qmsg2Len, req.pRSmaParam.xFilesFactor) !=
TSDB_CODE_SUCCESS) {
return NULL;
}
......
......@@ -149,7 +149,7 @@ int32_t tdUpdateExpireWindow(SSma* pSma, SSubmitReq* pMsg, int64_t version);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);
int32_t tdProcessRSmaCreate(SSma* pSma, SMeta* pMeta, SVCreateStbReq* pReq, SMsgCb* pMsgCb);
int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq* pReq);
int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType);
int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid);
int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore);
......
......@@ -165,7 +165,10 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
* @param pReq
* @return int32_t
*/
int32_t tdProcessRSmaCreate(SSma *pSma, SMeta *pMeta, SVCreateStbReq *pReq, SMsgCb *pMsgCb) {
int32_t tdProcessRSmaCreate(SVnode *pVnode, SVCreateStbReq *pReq) {
SSma *pSma = pVnode->pSma;
SMeta *pMeta = pVnode->pMeta;
SMsgCb *pMsgCb = &pVnode->msgCb;
if (!pReq->rollup) {
smaTrace("vgId:%d return directly since no rollup for stable %s %" PRIi64, SMA_VID(pSma), pReq->name, pReq->suid);
return TSDB_CODE_SUCCESS;
......@@ -210,6 +213,7 @@ int32_t tdProcessRSmaCreate(SSma *pSma, SMeta *pMeta, SVCreateStbReq *pReq, SMsg
.reader = pReadHandle,
.meta = pMeta,
.pMsgCb = pMsgCb,
.vnode = pVnode,
};
if (param->qmsg1) {
......
......@@ -360,7 +360,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq,
goto _err;
}
tdProcessRSmaCreate(pVnode->pSma, pVnode->pMeta, &req, &pVnode->msgCb);
tdProcessRSmaCreate(pVnode, &req);
tDecoderClear(&coder);
return 0;
......
......@@ -440,6 +440,7 @@ typedef struct STimeWindowSupp {
int64_t waterMark;
TSKEY maxTs;
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
SHashObj *winMap;
} STimeWindowAggSupp;
typedef struct SIntervalAggOperatorInfo {
......@@ -758,7 +759,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SExecTaskInfo*
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
STimeWindowAggSupp* pTwSup, int16_t tsColId);
STimeWindowAggSupp* pTwSup);
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
......@@ -837,6 +838,8 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx);
int64_t getSmaWaterMark(int64_t interval, double filesFactor);
bool isSmaStream(int8_t triggerType);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
#ifdef __cplusplus
......
......@@ -4529,8 +4529,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
}
SArray* tableIdList = extractTableIdList(pTableListInfo);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode,
pTaskInfo, &twSup, pTableScanNode->tsColId);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle,
tableIdList, pTableScanNode, pTaskInfo, &twSup);
taosArrayDestroy(tableIdList);
return pOperator;
......@@ -4631,7 +4631,19 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
STimeWindowAggSupp as = {.waterMark = pIntervalPhyNode->window.watermark,
.calTrigger = pIntervalPhyNode->window.triggerType,
.maxTs = INT64_MIN};
.maxTs = INT64_MIN,
.winMap = NULL,};
if (isSmaStream(pIntervalPhyNode->window.triggerType)) {
if (FLT_LESS(pIntervalPhyNode->window.filesFactor, 1.000000)) {
as.calTrigger = STREAM_TRIGGER_AT_ONCE_SMA;
} else {
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_TIMESTAMP);
as.winMap = taosHashInit(64, hashFn, true, HASH_NO_LOCK);
as.waterMark = getSmaWaterMark(interval.interval,
pIntervalPhyNode->window.filesFactor);
as.calTrigger = STREAM_TRIGGER_WINDOW_CLOSE_SMA;
}
}
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, &as, pTaskInfo);
......@@ -5300,3 +5312,18 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, const char* pKey) {
}
return createDiskbasedBuf(&pSup->pResultBuf, pageSize, bufSize, pKey, TD_TMP_DIR_PATH);
}
int64_t getSmaWaterMark(int64_t interval, double filesFactor) {
int64_t waterMark = 0;
ASSERT(FLT_GREATEREQUAL(filesFactor,0.000000));
waterMark = -1 * filesFactor;
return waterMark;
}
bool isSmaStream(int8_t triggerType) {
if (triggerType == STREAM_TRIGGER_AT_ONCE ||
triggerType == STREAM_TRIGGER_WINDOW_CLOSE) {
return false;
}
return true;
}
......@@ -875,7 +875,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
if (rows == 0) {
pOperator->status = OP_EXEC_DONE;
} else if (pInfo->pUpdateInfo) {
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true); // TODO(liuyao) get invertible from plan
SSDataBlock* upRes = getUpdateDataBlock(pInfo, true);
if (upRes) {
pInfo->pUpdateRes = upRes;
if (upRes->info.type == STREAM_REPROCESS) {
......@@ -894,7 +894,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHandle,
SArray* pTableIdList, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo,
STimeWindowAggSupp* pTwSup, int16_t tsColId) {
STimeWindowAggSupp* pTwSup) {
SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
......@@ -939,8 +939,12 @@ SOperatorInfo* createStreamScanOperatorInfo(void* pDataReader, SReadHandle* pHan
goto _error;
}
pInfo->primaryTsIndex = tsColId;
if (pSTInfo->interval.interval > 0) {
if (isSmaStream(pTableScanNode->triggerType)) {
pTwSup->waterMark = getSmaWaterMark(pSTInfo->interval.interval,
pTableScanNode->filesFactor);
}
pInfo->primaryTsIndex = 0; // pTableScanNode->tsColId;
if (pSTInfo->interval.interval > 0 && pDataReader) {
pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark);
} else {
pInfo->pUpdateInfo = NULL;
......
......@@ -748,10 +748,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM &&
(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == 0) ) {
saveResult(pResult, tableGroupId, pUpdated);
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE_SMA) {
saveResult(pResult, tableGroupId, pUpdated);
}
if (pInfo->twAggSup.winMap) {
taosHashRemove(pInfo->twAggSup.winMap, &win.skey, sizeof(TSKEY));
}
}
int32_t forwardStep = 0;
......@@ -824,10 +828,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM &&
(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == 0) ) {
saveResult(pResult, tableGroupId, pUpdated);
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE_SMA) {
saveResult(pResult, tableGroupId, pUpdated);
}
if (pInfo->twAggSup.winMap) {
taosHashRemove(pInfo->twAggSup.winMap, &win.skey, sizeof(TSKEY));
}
}
ekey = ascScan? nextWin.ekey:nextWin.skey;
......@@ -1172,15 +1180,23 @@ static int32_t closeIntervalWindow(SHashObj *pHashMap, STimeWindowAggSupp *pSup,
void* key = taosHashGetKey(pIte, &keyLen);
uint64_t groupId = *(uint64_t*) key;
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
TSKEY ts = *(uint64_t*) ((char*)key + sizeof(uint64_t));
TSKEY ts = *(int64_t*) ((char*)key + sizeof(uint64_t));
SResultRowInfo dumyInfo;
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, ts, pInterval,
pInterval->precision, NULL);
if (win.ekey < pSup->maxTs - pSup->waterMark) {
if (pSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
if (taosHashGet(pSup->winMap, &win.skey, sizeof(TSKEY))) {
continue;
}
}
char keyBuf[GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY))];
SET_RES_WINDOW_KEY(keyBuf, &ts, sizeof(TSKEY), groupId);
taosHashRemove(pHashMap, keyBuf, keyLen);
if (pSup->calTrigger != STREAM_TRIGGER_AT_ONCE_SMA &&
pSup->calTrigger != STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
taosHashRemove(pHashMap, keyBuf, keyLen);
}
SResKeyPos* pos = taosMemoryMalloc(sizeof(SResKeyPos) + sizeof(uint64_t));
if (pos == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
......@@ -1192,6 +1208,7 @@ static int32_t closeIntervalWindow(SHashObj *pHashMap, STimeWindowAggSupp *pSup,
taosMemoryFree(pos);
return TSDB_CODE_OUT_OF_MEMORY;
}
taosHashPut(pSup->winMap, &win.skey, sizeof(TSKEY), NULL, 0);
}
}
return TSDB_CODE_SUCCESS;
......@@ -1248,7 +1265,8 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
&pInfo->interval, pClosed);
finalizeUpdatedResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, pClosed,
pInfo->binfo.rowCellInfoOffset);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE ||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE_SMA) {
taosArrayAddAll(pUpdated, pClosed);
}
taosArrayDestroy(pClosed);
......@@ -2412,7 +2430,7 @@ int32_t closeSessionWindow(SArray *pWins, STimeWindowAggSupp *pTwSup, SArray *pC
return TSDB_CODE_OUT_OF_MEMORY;
}
pSeWin->isClosed = true;
if (calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) {
if (calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
pSeWin->isOutput = true;
}
}
......@@ -2486,7 +2504,7 @@ static SSDataBlock* doStreamSessionWindowAgg(SOperatorInfo* pOperator) {
SArray* pUpdated = taosArrayInit(16, POINTER_BYTES);
copyUpdateResult(pStUpdated, pUpdated, pBInfo->pRes->info.groupId);
taosHashCleanup(pStUpdated);
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER__WINDOW_CLOSE) {
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
taosArrayAddAll(pUpdated, pClosed);
}
......
......@@ -329,6 +329,10 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(intervalUnit);
COPY_SCALAR_FIELD(slidingUnit);
CLONE_NODE_FIELD(pTagCond);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(tsColId);
COPY_SCALAR_FIELD(filesFactor);
return (SNode*)pDst;
}
......@@ -385,6 +389,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
CLONE_NODE_FIELD(pStateExpr);
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(filesFactor);
return (SNode*)pDst;
}
......
......@@ -1133,6 +1133,7 @@ static const char* jkTableScanPhysiPlanSlidingUnit = "slidingUnit";
static const char* jkTableScanPhysiPlanTriggerType = "triggerType";
static const char* jkTableScanPhysiPlanWatermark = "watermark";
static const char* jkTableScanPhysiPlanTsColId = "tsColId";
static const char* jkTableScanPhysiPlanFilesFactor = "FilesFactor";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
......@@ -1183,6 +1184,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddDoubleToObject(pJson, jkTableScanPhysiPlanFilesFactor, pNode->filesFactor);
}
return code;
}
......@@ -1242,7 +1246,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkTableScanPhysiPlanTsColId, pNode->tsColId, code);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetDoubleValue(pJson, jkTableScanPhysiPlanFilesFactor, &pNode->filesFactor);
}
return code;
}
......@@ -1496,6 +1502,7 @@ static const char* jkWindowPhysiPlanFuncs = "Funcs";
static const char* jkWindowPhysiPlanTsPk = "TsPk";
static const char* jkWindowPhysiPlanTriggerType = "TriggerType";
static const char* jkWindowPhysiPlanWatermark = "Watermark";
static const char* jkWindowPhysiPlanFilesFactor = "FilesFactor";
static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
const SWinodwPhysiNode* pNode = (const SWinodwPhysiNode*)pObj;
......@@ -1516,6 +1523,9 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkWindowPhysiPlanWatermark, pNode->watermark);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddDoubleToObject(pJson, jkWindowPhysiPlanFilesFactor, pNode->filesFactor);
}
return code;
}
......@@ -1541,6 +1551,9 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
tjsonGetNumberValue(pJson, jkWindowPhysiPlanWatermark, pNode->watermark, code);
;
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetDoubleValue(pJson, jkWindowPhysiPlanFilesFactor, &pNode->filesFactor);
}
return code;
}
......
......@@ -881,7 +881,7 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType
}
break;
case TABLE_OPTION_FILE_FACTOR:
((STableOptions*)pOptions)->filesFactor = taosStr2Float(((SToken*)pVal)->z, NULL);
((STableOptions*)pOptions)->filesFactor = taosStr2Double(((SToken*)pVal)->z, NULL);
break;
case TABLE_OPTION_ROLLUP:
((STableOptions*)pOptions)->pRollupFuncs = pVal;
......
......@@ -487,6 +487,10 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
pWindow->watermark = pCxt->pPlanCxt->watermark;
}
if (pCxt->pPlanCxt->rSmaQuery) {
pWindow->filesFactor = pCxt->pPlanCxt->filesFactor;
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
}
......
......@@ -99,7 +99,8 @@ static bool osdMayBeOptimized(SLogicNode* pNode) {
return false;
}
// todo: release after function splitting
if (TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType) {
if (TSDB_SUPER_TABLE == ((SScanLogicNode*)pNode)->pMeta->tableType &&
SCAN_TYPE_STREAM != ((SScanLogicNode*)pNode)->scanType) {
return false;
}
if (NULL == pNode->pParent || (QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) &&
......@@ -226,6 +227,7 @@ static void setScanWindowInfo(SScanLogicNode* pScan) {
pScan->triggerType = ((SWindowLogicNode*)pScan->node.pParent)->triggerType;
pScan->watermark = ((SWindowLogicNode*)pScan->node.pParent)->watermark;
pScan->tsColId = ((SColumnNode*)((SWindowLogicNode*)pScan->node.pParent)->pTspk)->colId;
pScan->filesFactor = ((SWindowLogicNode*)pScan->node.pParent)->filesFactor;
}
}
......
......@@ -506,6 +506,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->triggerType = pScanLogicNode->triggerType;
pTableScan->watermark = pScanLogicNode->watermark;
pTableScan->tsColId = pScanLogicNode->tsColId;
pTableScan->filesFactor = pScanLogicNode->filesFactor;
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
}
......@@ -917,6 +918,7 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList*
pWindow->triggerType = pWindowLogicNode->triggerType;
pWindow->watermark = pWindowLogicNode->watermark;
pWindow->filesFactor = pWindowLogicNode->filesFactor;
if (TSDB_CODE_SUCCESS == code) {
*pPhyNode = (SPhysiNode*)pWindow;
......
......@@ -73,8 +73,8 @@ static int64_t adjustInterval(int64_t interval, int32_t precision) {
}
static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t watermark) {
if (watermark <= 0) {
watermark = TMIN(originInt/adjInterval, 1) * adjInterval;
if (watermark <= adjInterval) {
watermark = TMAX(originInt/adjInterval, 1) * adjInterval;
} else if (watermark > MAX_NUM_SCALABLE_BF * adjInterval) {
watermark = MAX_NUM_SCALABLE_BF * adjInterval;
}/* else if (watermark < MIN_NUM_SCALABLE_BF * adjInterval) {
......
......@@ -5,7 +5,7 @@ sleep 50
sql connect
print =============== create database
sql create database d1
sql create database d1 vgroups 1
sql use d1
print =============== create super table, include column type for count/sum/min/max/first
......
......@@ -23,7 +23,7 @@ sql insert into t1 values(1648791223001,10,2,3,1.1,2);
sql insert into t1 values(1648791233002,3,2,3,2.1,3);
sql insert into t1 values(1648791243003,NULL,NULL,NULL,NULL,4);
sql insert into t1 values(1648791213002,NULL,NULL,NULL,NULL,5) (1648791233012,NULL,NULL,NULL,NULL,6);
sleep 300
sql select * from streamt order by s desc;
# row 0
......@@ -115,7 +115,7 @@ sql insert into t1 values(1648791233002,3,2,3,2.1,9);
sql insert into t1 values(1648791243003,4,2,3,3.1,10);
sql insert into t1 values(1648791213002,4,2,3,4.1,11) ;
sql insert into t1 values(1648791213002,4,2,3,4.1,12) (1648791223009,4,2,3,4.1,13);
sleep 300
sql select * from streamt order by s desc ;
# row 0
......
......@@ -22,7 +22,7 @@ sql insert into t1 values(1648791210000,1,1,1,1.1,1);
sql insert into t1 values(1648791220000,2,2,2,2.1,2);
sql insert into t1 values(1648791230000,3,3,3,3.1,3);
sql insert into t1 values(1648791240000,4,4,4,4.1,4);
sleep 300
sql select * from streamt order by s desc;
# row 0
......@@ -50,7 +50,7 @@ sql insert into t1 values(1648791250005,5,5,5,5.1,5);
sql insert into t1 values(1648791260006,6,6,6,6.1,6);
sql insert into t1 values(1648791270007,7,7,7,7.1,7);
sql insert into t1 values(1648791240005,5,5,5,5.1,8) (1648791250006,6,6,6,6.1,9);
sleep 300
sql select * from streamt order by s desc;
# row 0
......@@ -100,7 +100,7 @@ sql insert into t1 values(1648791260007,7,7,7,7.1,12) (1648791290008,7,7,7,7.1,1
sql insert into t1 values(1648791500000,7,7,7,7.1,15) (1648791520000,8,8,8,8.1,16) (1648791540000,8,8,8,8.1,17);
sql insert into t1 values(1648791530000,8,8,8,8.1,18);
sql insert into t1 values(1648791220000,10,10,10,10.1,19) (1648791290008,2,2,2,2.1,20) (1648791540000,17,17,17,17.1,21) (1648791500001,22,22,22,22.1,22);
sleep 300
sql select * from streamt order by s desc;
# row 0
......
......@@ -94,92 +94,4 @@ if $data11 != 5 then
return -1
endi
sql create table t2(ts timestamp, a int, b int , c int, d double);
sql create stream streams2 trigger window_close watermark 20s into streamt2 as select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5 from t2 interval(10s);
sql insert into t2 values(1648791213000,1,2,3,1.0);
sql insert into t2 values(1648791239999,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 0 then
print ======$rows
return -1
endi
sql insert into t2 values(1648791240000,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 1 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
sql insert into t2 values(1648791250001,1,2,3,1.0) (1648791250002,1,2,3,1.0) (1648791250003,1,2,3,1.0) (1648791240000,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 1 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
sql insert into t2 values(1648791280000,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 4 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
if $data11 != 1 then
print ======$data11
return -1
endi
if $data21 != 1 then
print ======$data21
return -1
endi
if $data31 != 3 then
print ======$data31
return -1
endi
sql insert into t2 values(1648791250001,1,2,3,1.0) (1648791250002,1,2,3,1.0) (1648791250003,1,2,3,1.0) (1648791280000,1,2,3,1.0) (1648791280001,1,2,3,1.0) (1648791280002,1,2,3,1.0) (1648791310000,1,2,3,1.0) (1648791280001,1,2,3,1.0);
sleep 300
sql select * from streamt2;
if $rows != 5 then
print ======$rows
return -1
endi
if $data01 != 1 then
print ======$data01
return -1
endi
if $data11 != 1 then
print ======$data11
return -1
endi
if $data21 != 1 then
print ======$data21
return -1
endi
if $data31 != 3 then
print ======$data31
return -1
endi
if $data41 != 3 then
print ======$data31
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册