diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index 21d8405f582a7f3e1e47cb137a34b3fae71cc8f3..f1cfa58f62f8b80dc7f3725fcba6e0d3395e3d39 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -96,6 +96,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad); int32_t mndProcessRpcMsg(SRpcMsg *pMsg); int32_t mndProcessSyncMsg(SRpcMsg *pMsg); int32_t mndPreProcessMsg(SRpcMsg *pMsg); +void mndAbortPreprocessMsg(SRpcMsg *pMsg); /** * @brief Generate machine code diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 38a179869f603aaaedc71a3b3f0108075ddfcde4..b07e8f39d5bcae86ebd32208613f98c7e2bff497 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -106,6 +106,9 @@ typedef struct SInterpFuncLogicNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode } SInterpFuncLogicNode; typedef enum EModifyTableType { MODIFY_TABLE_TYPE_INSERT = 1, MODIFY_TABLE_TYPE_DELETE } EModifyTableType; @@ -309,6 +312,9 @@ typedef struct SInterpFuncPhysiNode { SNodeList* pFuncs; STimeWindow timeRange; int64_t interval; + EFillMode fillMode; + SNode* pFillValues; // SNodeListNode + SNode* pTimeSeries; // SColumnNode } SInterpFuncPhysiNode; typedef struct SJoinPhysiNode { diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index f3f147955a03cb99f1ea806441eddd63ba8f96fe..3b8a37f4208386eb06ff4f3153fab271e3ddb011 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -64,6 +64,8 @@ typedef struct { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb); +int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); + int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index fde016375be764a286c96828d1f95e35ecc333d8..9fb9bcebf7b8ae731b616bc5b151455e248230e7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -564,6 +564,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265A) #define TSDB_CODE_PAR_GROUP_BY_NOT_ALLOWED_FUNC TAOS_DEF_ERROR_CODE(0, 0x265B) #define TSDB_CODE_PAR_INVALID_TABLE_OPTION TAOS_DEF_ERROR_CODE(0, 0x265C) +#define TSDB_CODE_PAR_INVALID_INTERP_CLAUSE TAOS_DEF_ERROR_CODE(0, 0x265D) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 5ea55d558ff39afda2f70ecb067abbd62adae073..95b721b4dd651ffd174e1b0a9bf4b15172af7203 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -550,6 +550,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { pMsg->msgType != TDMT_MND_TRANS_TIMER) { mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); + mndAbortPreprocessMsg(pMsg); + SEpSet epSet = {0}; mndGetMnodeEpSet(pMsg->info.node, &epSet); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index 5374f48e47521f7cdc477ac469ce0bf5fbbf7aef..f32a3129dece8e55d37d68d61e39787e31fb8bdb 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -25,6 +25,13 @@ int32_t mndPreProcessMsg(SRpcMsg *pMsg) { return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); } +void mndAbortPreprocessMsg(SRpcMsg *pMsg) { + if (TDMT_VND_QUERY != pMsg->msgType) return; + + SMnode *pMnode = pMsg->info.node; + qWorkerAbortPreprocessQueryMsg(pMnode->pQuery, pMsg); +} + int32_t mndProcessQueryMsg(SRpcMsg *pMsg) { int32_t code = -1; SMnode *pMnode = pMsg->info.node; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index f8650d95c17c9d265e5030def4f57887c187095e..c556e94c74b8f3795cca197fd26544579aceeaad 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2892,7 +2892,10 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin SArray* pChWins = getWinInfos(&pChInfo->streamAggSup, groupId); int32_t chWinSize = taosArrayGetSize(pChWins); int32_t index = binarySearch(pChWins, chWinSize, pParentWin->win.skey, TSDB_ORDER_DESC, getSessionWindowEndkey); - for (int32_t k = index; k > 0 && k < chWinSize; k++) { + if (index < 0) { + index = 0; + } + for (int32_t k = index; k < chWinSize; k++) { SResultWindowInfo* pcw = taosArrayGet(pChWins, k); if (pParentWin->win.skey <= pcw->win.skey && pcw->win.ekey <= pParentWin->win.ekey) { SResultRow* pChResult = NULL; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 583e8bd3007371b9bbfa500ec2f0bee8765b7207..c72d7b5106eb2f4ad14be2b1ed35b4da56c148fb 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1481,11 +1481,13 @@ int32_t minMaxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 if (pSBuf->assign && ((((*(double*)&pDBuf->v) < (*(double*)&pSBuf->v)) ^ isMinFunc) || !pDBuf->assign)) { *(double*)&pDBuf->v = *(double*)&pSBuf->v; replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos); + pDBuf->assign = true; } } else { if (pSBuf->assign && (((pDBuf->v < pSBuf->v) ^ isMinFunc) || !pDBuf->assign)) { pDBuf->v = pSBuf->v; replaceTupleData(&pDBuf->tuplePos, &pSBuf->tuplePos); + pDBuf->assign = true; } } pDResInfo->numOfRes = TMAX(pDResInfo->numOfRes, pSResInfo->numOfRes); @@ -1745,10 +1747,10 @@ int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { if (IS_INTEGER_TYPE(type)) { avg = pStddevRes->isum / ((double)pStddevRes->count); - pStddevRes->result = sqrt(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg); + pStddevRes->result = sqrt(fabs(pStddevRes->quadraticISum / ((double)pStddevRes->count) - avg * avg)); } else { avg = pStddevRes->dsum / ((double)pStddevRes->count); - pStddevRes->result = sqrt(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg); + pStddevRes->result = sqrt(fabs(pStddevRes->quadraticDSum / ((double)pStddevRes->count) - avg * avg)); } return functionFinalize(pCtx, pBlock); @@ -5362,4 +5364,4 @@ int32_t interpFunction(SqlFunctionCtx* pCtx) { #endif return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index b12f910cb1273f7d517ff100981164b03898e5f6..5eaca60ea5d8c9f9439a7b9139789622ecad7dbf 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -464,6 +464,9 @@ static SNode* logicInterpFuncCopy(const SInterpFuncLogicNode* pSrc, SInterpFuncL CLONE_NODE_LIST_FIELD(pFuncs); COPY_OBJECT_FIELD(timeRange, sizeof(STimeWindow)); COPY_SCALAR_FIELD(interval); + COPY_SCALAR_FIELD(fillMode); + CLONE_NODE_FIELD(pFillValues); + CLONE_NODE_FIELD(pTimeSeries); return (SNode*)pDst; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 57d1ee78573eee8807d1e33bdd1f3f1f42e06d54..bde5159087634a2ad06b6b4a1e38ff06f7a32948 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2133,6 +2133,9 @@ static const char* jkInterpFuncPhysiPlanFuncs = "Funcs"; static const char* jkInterpFuncPhysiPlanStartTime = "StartTime"; static const char* jkInterpFuncPhysiPlanEndTime = "EndTime"; static const char* jkInterpFuncPhysiPlanInterval = "Interval"; +static const char* jkInterpFuncPhysiPlanFillMode = "FillMode"; +static const char* jkInterpFuncPhysiPlanFillValues = "FillValues"; +static const char* jkInterpFuncPhysiPlanTimeSeries = "TimeSeries"; static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { const SInterpFuncPhysiNode* pNode = (const SInterpFuncPhysiNode*)pObj; @@ -2153,6 +2156,15 @@ static int32_t physiInterpFuncNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanInterval, pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanFillValues, nodeToJson, pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddObject(pJson, jkInterpFuncPhysiPlanTimeSeries, nodeToJson, pNode->pTimeSeries); + } return code; } @@ -2176,6 +2188,15 @@ static int32_t jsonToPhysiInterpFuncNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkInterpFuncPhysiPlanInterval, &pNode->interval); } + if (TSDB_CODE_SUCCESS == code) { + tjsonGetNumberValue(pJson, jkInterpFuncPhysiPlanFillMode, pNode->fillMode, code); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanFillValues, &pNode->pFillValues); + } + if (TSDB_CODE_SUCCESS == code) { + code = jsonToNodeObject(pJson, jkInterpFuncPhysiPlanTimeSeries, &pNode->pTimeSeries); + } return code; } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 6b3fdad27d38eba5046eb50d889a13c1bd0f106b..ccb0d37da65a6be158dfb0295815151bb3ce36f1 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -700,8 +700,11 @@ SNode* addEveryClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pEvery) { SNode* addFillClause(SAstCreateContext* pCxt, SNode* pStmt, SNode* pFill) { CHECK_PARSER_STATUS(pCxt); - if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) { - ((SSelectStmt*)pStmt)->pFill = pFill; + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt) && NULL != pFill) { + SFillNode* pFillClause = (SFillNode*)pFill; + nodesDestroyNode(pFillClause->pWStartTs); + pFillClause->pWStartTs = createPrimaryKeyCol(pCxt); + ((SSelectStmt*)pStmt)->pFill = (SNode*)pFillClause; } return pStmt; } @@ -909,7 +912,7 @@ SNode* createDefaultTableOptions(SAstCreateContext* pCxt) { pOptions->watermark1 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->watermark2 = TSDB_DEFAULT_ROLLUP_WATERMARK; pOptions->ttl = TSDB_DEFAULT_TABLE_TTL; - pOptions->commentNull = true; // mark null + pOptions->commentNull = true; // mark null return (SNode*)pOptions; } @@ -918,7 +921,7 @@ SNode* createAlterTableOptions(SAstCreateContext* pCxt) { STableOptions* pOptions = (STableOptions*)nodesMakeNode(QUERY_NODE_TABLE_OPTIONS); CHECK_OUT_OF_MEM(pOptions); pOptions->ttl = -1; - pOptions->commentNull = true; // mark null + pOptions->commentNull = true; // mark null return (SNode*)pOptions; } @@ -940,9 +943,9 @@ SNode* setTableOption(SAstCreateContext* pCxt, SNode* pOptions, ETableOptionType case TABLE_OPTION_ROLLUP: ((STableOptions*)pOptions)->pRollupFuncs = pVal; break; - case TABLE_OPTION_TTL:{ + case TABLE_OPTION_TTL: { int64_t ttl = taosStr2Int64(((SToken*)pVal)->z, NULL, 10); - if (ttl > INT32_MAX){ + if (ttl > INT32_MAX) { ttl = INT32_MAX; } // ttl can not be smaller than 0, because there is a limitation in sql.y (TTL NK_INTEGER) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 77ef9027cdc6f49651653eb3744d6091dbcdaea7..a60dba3a9a46a81a0a20dd98397dc83593e59468 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1932,26 +1932,33 @@ static int32_t getFillTimeRange(STranslateContext* pCxt, SNode* pWhere, STimeWin return code; } -static int32_t checkFill(STranslateContext* pCxt, SIntervalWindowNode* pInterval) { - SFillNode* pFill = (SFillNode*)pInterval->pFill; +static int32_t checkFill(STranslateContext* pCxt, SFillNode* pFill, SValueNode* pInterval) { + if (FILL_MODE_NONE == pFill->mode) { + return TSDB_CODE_SUCCESS; + } + if (TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_INITIALIZER) || TSWINDOW_IS_EQUAL(pFill->timeRange, TSWINDOW_DESC_INITIALIZER)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); } - int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); - int64_t intervalRange = 0; - SValueNode* pInter = (SValueNode*)pInterval->pInterval; - if (TIME_IS_VAR_DURATION(pInter->unit)) { + // interp FILL clause + if (NULL == pInterval) { + return TSDB_CODE_SUCCESS; + } + + int64_t timeRange = TABS(pFill->timeRange.skey - pFill->timeRange.ekey); + int64_t intervalRange = 0; + if (TIME_IS_VAR_DURATION(pInterval->unit)) { int64_t f = 1; - if (pInter->unit == 'n') { + if (pInterval->unit == 'n') { f = 30L * MILLISECOND_PER_DAY; - } else if (pInter->unit == 'y') { + } else if (pInterval->unit == 'y') { f = 365L * MILLISECOND_PER_DAY; } - intervalRange = pInter->datum.i * f; + intervalRange = pInterval->datum.i * f; } else { - intervalRange = pInter->datum.i; + intervalRange = pInterval->datum.i; } if ((timeRange == 0) || (timeRange / intervalRange) >= MAX_INTERVAL_TIME_WINDOW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_FILL_TIME_RANGE); @@ -1967,7 +1974,7 @@ static int32_t translateFill(STranslateContext* pCxt, SNode* pWhere, SIntervalWi int32_t code = getFillTimeRange(pCxt, pWhere, &(((SFillNode*)pInterval->pFill)->timeRange)); if (TSDB_CODE_SUCCESS == code) { - code = checkFill(pCxt, pInterval); + code = checkFill(pCxt, (SFillNode*)pInterval->pFill, (SValueNode*)pInterval->pInterval); } return code; } @@ -2109,6 +2116,64 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { return code; } +static int32_t createDefaultFillNode(STranslateContext* pCxt, SNode** pOutput) { + SFillNode* pFill = (SFillNode*)nodesMakeNode(QUERY_NODE_FILL); + if (NULL == pFill) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pFill->mode = FILL_MODE_NONE; + + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + nodesDestroyNode((SNode*)pFill); + return TSDB_CODE_OUT_OF_MEMORY; + } + pCol->colId = PRIMARYKEY_TIMESTAMP_COL_ID; + strcpy(pCol->colName, PK_TS_COL_INTERNAL_NAME); + pFill->pWStartTs = (SNode*)pCol; + + *pOutput = (SNode*)pFill; + return TSDB_CODE_SUCCESS; +} + +static int32_t translateInterpFill(STranslateContext* pCxt, SSelectStmt* pSelect) { + int32_t code = TSDB_CODE_SUCCESS; + + if (NULL == pSelect->pFill) { + code = createDefaultFillNode(pCxt, &pSelect->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, &pSelect->pFill); + } + if (TSDB_CODE_SUCCESS == code) { + code = getFillTimeRange(pCxt, pSelect->pRange, &(((SFillNode*)pSelect->pFill)->timeRange)); + } + if (TSDB_CODE_SUCCESS == code) { + code = checkFill(pCxt, (SFillNode*)pSelect->pFill, (SValueNode*)pSelect->pEvery); + } + + return code; +} + +static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (!pSelect->hasInterpFunc) { + if (NULL != pSelect->pRange || NULL != pSelect->pEvery || NULL != pSelect->pFill) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE); + } + return TSDB_CODE_SUCCESS; + } + + int32_t code = translateExpr(pCxt, &pSelect->pRange); + if (TSDB_CODE_SUCCESS == code) { + code = translateExpr(pCxt, &pSelect->pEvery); + } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterpFill(pCxt, pSelect); + } + return code; +} + static int32_t translatePartitionBy(STranslateContext* pCxt, SNodeList* pPartitionByList) { pCxt->currClause = SQL_CLAUSE_PARTITION_BY; return translateExprList(pCxt, pPartitionByList); @@ -2378,6 +2443,9 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) { if (TSDB_CODE_SUCCESS == code) { code = checkLimit(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = translateInterp(pCxt, pSelect); + } if (TSDB_CODE_SUCCESS == code) { code = rewriteUniqueStmt(pCxt, pSelect); } @@ -3388,18 +3456,18 @@ static int32_t buildCreateStbReq(STranslateContext* pCxt, SCreateTableStmt* pStm pReq->delay2 = pStmt->pOptions->maxDelay2; pReq->watermark1 = pStmt->pOptions->watermark1; pReq->watermark2 = pStmt->pOptions->watermark2; -// pReq->ttl = pStmt->pOptions->ttl; + // pReq->ttl = pStmt->pOptions->ttl; columnDefNodeToField(pStmt->pCols, &pReq->pColumns); columnDefNodeToField(pStmt->pTags, &pReq->pTags); pReq->numOfColumns = LIST_LENGTH(pStmt->pCols); pReq->numOfTags = LIST_LENGTH(pStmt->pTags); - if(pStmt->pOptions->commentNull == false){ + if (pStmt->pOptions->commentNull == false) { pReq->comment = strdup(pStmt->pOptions->comment); if (NULL == pReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } pReq->commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { pReq->commentLen = -1; } @@ -3452,14 +3520,14 @@ static int32_t buildAlterSuperTableReq(STranslateContext* pCxt, SAlterTableStmt* pAlterReq->alterType = pStmt->alterType; if (TSDB_ALTER_TABLE_UPDATE_OPTIONS == pStmt->alterType) { -// pAlterReq->ttl = pStmt->pOptions->ttl; + // pAlterReq->ttl = pStmt->pOptions->ttl; if (pStmt->pOptions->commentNull == false) { pAlterReq->comment = strdup(pStmt->pOptions->comment); if (NULL == pAlterReq->comment) { return TSDB_CODE_OUT_OF_MEMORY; } pAlterReq->commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { pAlterReq->commentLen = -1; } @@ -4725,7 +4793,7 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* return TSDB_CODE_OUT_OF_MEMORY; } req.commentLen = strlen(pStmt->pOptions->comment); - }else{ + } else { req.commentLen = -1; } req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols); @@ -4878,11 +4946,11 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S struct SVCreateTbReq req = {0}; req.type = TD_CHILD_TABLE; req.name = strdup(pStmt->tableName); - req.ttl = pStmt->pOptions->ttl; + req.ttl = pStmt->pOptions->ttl; if (pStmt->pOptions->commentNull == false) { req.comment = strdup(pStmt->pOptions->comment); req.commentLen = strlen(pStmt->pOptions->comment); - } else{ + } else { req.commentLen = -1; } req.ctb.suid = suid; @@ -5460,15 +5528,14 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p pReq->newTTL = pStmt->pOptions->ttl; } - if (TSDB_CODE_SUCCESS == code){ - if(pStmt->pOptions->commentNull == false) { + if (TSDB_CODE_SUCCESS == code) { + if (pStmt->pOptions->commentNull == false) { pReq->newComment = strdup(pStmt->pOptions->comment); if (NULL == pReq->newComment) { code = TSDB_CODE_OUT_OF_MEMORY; } pReq->newCommentLen = strlen(pReq->newComment); - } - else{ + } else { pReq->newCommentLen = -1; } } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 95cdf58d5a5bc3d6e02e0bf67154839562d4a6e5..d94b430c452a0e131a1ef91f836615dad90bddaa 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -196,6 +196,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "%s function does not supportted in group query"; case TSDB_CODE_PAR_INVALID_TABLE_OPTION: return "Invalid option %s"; + case TSDB_CODE_PAR_INVALID_INTERP_CLAUSE: + return "Invalid usage of RANGE clause, EVERY clause or FILL clause"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/parser/test/parSelectTest.cpp b/source/libs/parser/test/parSelectTest.cpp index 693c422ae0bfd38da6fc69f33430aa46874278e5..11e09cd83d89f5ba02df18f5a439786ffe6d83c0 100644 --- a/source/libs/parser/test/parSelectTest.cpp +++ b/source/libs/parser/test/parSelectTest.cpp @@ -267,8 +267,6 @@ TEST_F(ParserSelectTest, interp) { run("SELECT INTERP(c1) FROM t1 EVERY(5s)"); - run("SELECT INTERP(c1) FROM t1 EVERY(5s) FILL(LINEAR)"); - run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s)"); run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)"); diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index b365afff02875114b4b61c598e6095c7ff4aab66..c7490faa0c28524830f16515e2eb16278b894706 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -508,10 +508,15 @@ static int32_t createInterpFuncLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p code = rewriteExprsForSelect(pInterpFunc->pFuncs, pSelect, SQL_CLAUSE_SELECT); } - if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pRange) { - // SRangeNode* pRange = (SRangeNode*)pSelect->pRange; - // pInterpFunc->timeRange.skey = ((SValueNode*)pRange->pStart)->datum.i; - // pInterpFunc->timeRange.ekey = ((SValueNode*)pRange->pEnd)->datum.i; + if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pFill) { + SFillNode* pFill = (SFillNode*)pSelect->pFill; + pInterpFunc->timeRange = pFill->timeRange; + pInterpFunc->fillMode = pFill->mode; + pInterpFunc->pTimeSeries = nodesCloneNode(pFill->pWStartTs); + pInterpFunc->pFillValues = nodesCloneNode(pFill->pValues); + if (NULL == pInterpFunc->pTimeSeries || (NULL != pFill->pValues && NULL == pInterpFunc->pFillValues)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pEvery) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 70a8aca8bd2aa1f7968284ed98556a94ca368c62..6af5a15147b4805eec1c374020cdb93c1f67abf8 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -877,6 +877,12 @@ static int32_t createInterpFuncPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pCh if (TSDB_CODE_SUCCESS == code) { pInterpFunc->timeRange = pFuncLogicNode->timeRange; pInterpFunc->interval = pFuncLogicNode->interval; + pInterpFunc->fillMode = pFuncLogicNode->fillMode; + pInterpFunc->pFillValues = nodesCloneNode(pFuncLogicNode->pFillValues); + pInterpFunc->pTimeSeries = nodesCloneNode(pFuncLogicNode->pTimeSeries); + if (NULL == pInterpFunc->pTimeSeries || (NULL != pFuncLogicNode->pFillValues && NULL == pInterpFunc->pFillValues)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } } if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index ecff861f50d5c31593edeaab28097308a5316b64..8c7c030dce584dca94d843d85876b3cf29538ac2 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -23,6 +23,7 @@ extern "C" { #include "qwInt.h" #include "dataSinkMgt.h" +int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF); int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t explain, const char* sql); int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg); diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 5635ec8fc61c2de62b7de4a34168fdcf03e40820..82a62b5c5a65c5a8e62e4103b674c81b4fe6b03a 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -283,6 +283,26 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; } +int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { + if (NULL == qWorkerMgmt || NULL == pMsg) { + QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + SSubQueryMsg *msg = pMsg->pCont; + SQWorker * mgmt = (SQWorker *)qWorkerMgmt; + + uint64_t sId = msg->sId; + uint64_t qId = msg->queryId; + uint64_t tId = msg->taskId; + int64_t rId = msg->refId; + + QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); + qwAbortPrerocessQuery(QW_FPARAMS()); + QW_SCH_TASK_DLOG("Abort prerocessQuery end, handle:%p", pMsg->info.handle); + + return TSDB_CODE_SUCCESS; +} + int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 800cc4c6e59ff7da06f699ce6fb8294819873c14..0f95da42e208e83115fe88fad03b84a7b8691710 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -482,6 +482,13 @@ _return: QW_RET(code); } +int32_t qwAbortPrerocessQuery(QW_FPARAMS_DEF) { + QW_ERR_RET(qwDropTask(QW_FPARAMS())); + + QW_RET(TSDB_CODE_SUCCESS); +} + + int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; bool queryRsped = false; diff --git a/tests/pytest/util/common.py b/tests/pytest/util/common.py index 7b00e6f331f6053c96ce56b5a79219b6967c6ecd..e158f8edd2e30386afad73f688a25c0f7c56fc46 100644 --- a/tests/pytest/util/common.py +++ b/tests/pytest/util/common.py @@ -11,13 +11,20 @@ # -*- coding: utf-8 -*- +from collections import defaultdict import random import string -from util.sql import tdSql -from util.dnodes import tdDnodes import requests import time import socket + +import taos +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + class TDCom: def init(self, conn, logSql): tdSql.init(conn.cursor(), logSql) @@ -170,4 +177,122 @@ class TDCom: def close(self): self.cursor.close() + def create_database(self,tsql, dbName='test',dropFlag=1,precision="ms", **kwargs): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + ''' + vgroups replica precision strict wal fsync comp cachelast single_stable buffer pagesize pages minrows maxrows duration keep retentions + ''' + sqlString = f'create database if not exists {dbName} precision "{precision}" vgroups 4' + if len(kwargs) > 0: + dbParams = "" + for param, value in kwargs.items(): + dbParams += f'{param} {value} ' + sqlString += f'{dbParams}' + + tsql.execute(sqlString) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, dbName,stbName,columnDict,tagDict): + colSchema = '' + for i in range(columnDict['int']): + colSchema += ', c%d int'%i + tagSchema = '' + for i in range(tagDict['int']): + if i > 0: + tagSchema += ',' + tagSchema += 't%d int'%i + + tsql.execute("create table if not exists %s.%s (ts timestamp %s) tags(%s)"%(dbName, stbName, colSchema, tagSchema)) + tdLog.debug("complete to create %s.%s" %(dbName, stbName)) + return + + def create_ctables(self,tsql, dbName,stbName,ctbNum,tagDict): + tsql.execute("use %s" %dbName) + tagsValues = '' + for i in range(tagDict['int']): + if i > 0: + tagsValues += ',' + tagsValues += '%d'%i + + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%s)"%(stbName,i,stbName,tagsValues) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + if startTs == 0: + t = time.time() + startTs = int(round(t * 1000)) + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, %d)"%(startTs + j, j, j) + if (j > 0) and ((j%batchNum == 0) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def getClientCfgPath(self): + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + return cfgPath + + def newcur(self,host='localhost',port=6030,user='root',password='taosdata'): + cfgPath = self.getClientCfgPath() + con=taos.connect(host=host, user=user, password=password, config=cfgPath, port=port) + cur=con.cursor() + print(cur) + return cur + + def newTdSql(self, host='localhost',port=6030,user='root',password='taosdata'): + newTdSql = TDSql() + cur = self.newcur(host=host,port=port,user=user,password=password) + newTdSql.init(cur, False) + return newTdSql + tdCom = TDCom() diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index a8ec0f515180cddc7f84260cbd277c6d2963c065..720625a5700da354d52e4ba9e9bc7e2c4921de14 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -73,6 +73,7 @@ ./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic2.sim # ./test.sh -f tsim/stream/distributeInterval0.sim +# ./test.sh -f tsim/stream/distributesession0.sim # ./test.sh -f tsim/stream/session0.sim # ./test.sh -f tsim/stream/session1.sim # ./test.sh -f tsim/stream/state0.sim diff --git a/tests/script/tsim/stream/distributesession0.sim b/tests/script/tsim/stream/distributesession0.sim new file mode 100644 index 0000000000000000000000000000000000000000..78f65ed8a3bc7e3eeed85f0db00bfaf5395f46e0 --- /dev/null +++ b/tests/script/tsim/stream/distributesession0.sim @@ -0,0 +1,58 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +sql create dnode $hostname2 port 7200 + +system sh/exec.sh -n dnode2 -s start + +sql create database test vgroups 4; +sql use test; +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create stream stream_t1 trigger at_once into streamtST as select _wstartts, count(*) c1, sum(a) c2 , max(b) c3 from st session(ts, 10s) ; + +sleep 1000 + +sql insert into ts1 values(1648791211000,1,1,1) (1648791211005,1,1,1); +sql insert into ts2 values(1648791221004,1,2,3) (1648791221008,2,2,3); +sql insert into ts1 values(1648791211005,1,1,1); +sql insert into ts2 values(1648791221006,5,5,5) (1648791221007,5,5,5); +sql insert into ts2 values(1648791221008,5,5,5) (1648791221008,5,5,5)(1648791221006,5,5,5); +sql insert into ts1 values(1648791231000,1,1,1) (1648791231002,1,1,1) (1648791231006,1,1,1); +sql insert into ts1 values(1648791211000,6,6,6) (1648791231002,2,2,2); +sql insert into ts1 values(1648791211002,7,7,7); +sql insert into ts1 values(1648791211002,7,7,7) ts2 values(1648791221008,5,5,5) ; + +$loop_count = 0 +loop1: +sql select * from streamtST; + +sleep 300 +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +# row 0 +if $data01 != 10 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 34 then + print =====data02=$data02 + goto loop1 +endi + +if $data03 != 7 then + print ======$data03 + return -1 +endi + +system sh/stop_dnodes.sh \ No newline at end of file diff --git a/tests/system-test/2-query/Today.py b/tests/system-test/2-query/Today.py index 9eb06de9fb16731bbaa69460f24f4ecff78a2ff3..a9c3215a1283915e7c52702ef5ea80237c2d952a 100644 --- a/tests/system-test/2-query/Today.py +++ b/tests/system-test/2-query/Today.py @@ -4,394 +4,163 @@ from util.dnodes import * from util.log import * from util.sql import * from util.cases import * +import datetime class TDTestCase: - updatecfgDict = {'rpcDebugFlag': '143'} def init(self, conn, logSql): tdLog.debug(f"start to excute {__file__}") tdSql.init(conn.cursor()) - - def run(self): # sourcery skip: extract-duplicate-method - # for func now() , today(), timezone() - tdSql.prepare() - today_date = datetime.datetime.strptime( + self.today_date = datetime.datetime.strptime( datetime.datetime.now().strftime("%Y-%m-%d"), "%Y-%m-%d") + self.time_unit = ['b','u','a','s','m','h','d','w'] + self.error_param = ['1.5','abc','!@#','"abc"','today()'] + self.arithmetic_operators = ['+','-','*','/'] + self.relational_operator = ['<','<=','=','>=','>'] + # prepare data + self.ntbname = 'ntb' + self.stbname = 'stb' + self.column_dict = { + 'ts':'timestamp', + 'c1':'int', + 'c2':'float', + 'c3':'double', + 'c4':'timestamp' + } + self.tag_dict = { + 't0':'int' + } + self.tbnum = 2 + self.tag_values = [ + f'10', + f'100' + ] + self.values_list = [f'now,1,1.55,100.555555,today()', + f'now+1d,10,11.11,99.999999,now()', + f'today(),3,3.333,333.333333,now()', + f'today()-1d,10,11.11,99.999999,now()', + f'today()+1d,1,1.55,100.555555,today()'] + + def set_create_normaltable_sql(self, ntbname, column_dict): + column_sql = '' + for k, v in column_dict.items(): + column_sql += f"{k} {v}," + create_ntb_sql = f'create table {ntbname} ({column_sql[:-1]})' + return create_ntb_sql + def set_create_stable_sql(self,stbname,column_dict,tag_dict): + column_sql = '' + tag_sql = '' + for k,v in column_dict.items(): + column_sql += f"{k} {v}," + for k,v in tag_dict.items(): + tag_sql += f"{k} {v}," + create_stb_sql = f'create table {stbname} ({column_sql[:-1]}) tags({tag_sql[:-1]})' + return create_stb_sql + def data_check(self,column_dict={},tbname = '',values_list = [],tb_num = 1,tb = 'tb'): + for k,v in column_dict.items(): + num_up = 0 + num_down = 0 + num_same = 0 + if v.lower() == 'timestamp': + tdSql.query(f'select {k} from {tbname}') + for i in tdSql.queryResult: + if i[0] > self.today_date: + num_up += 1 + elif i[0] == self.today_date: + num_same += 1 + elif i[0] < self.today_date: + num_down += 1 + tdSql.query(f"select today() from {tbname}") + tdSql.checkRows(len(values_list)*tb_num) + tdSql.checkData(0, 0, str(self.today_date)) + tdSql.query(f"select * from {tbname} where {k}=today()") + if tb == 'tb': + tdSql.checkRows(num_same*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_same) + for i in [f'{tbname}',f'db.{tbname}']: + for unit in self.time_unit: + for symbol in ['+','-']: + tdSql.query(f"select today() {symbol}1{unit} from {i}") + tdSql.checkRows(len(values_list)*tb_num) + for unit in self.error_param: + for symbol in self.arithmetic_operators: + tdSql.error(f'select today() {symbol}{unit} from {i}') + for symbol in self.arithmetic_operators: + tdSql.query(f'select now(){symbol}null from {i}') + tdSql.checkData(0,0,None) + for symbol in self.relational_operator: + tdSql.query(f'select * from {i} where {k} {symbol} today()') + if symbol == '<' : + if tb == 'tb': + tdSql.checkRows(num_down*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_down) + elif symbol == '<=': + if tb == 'tb': + tdSql.checkRows((num_same+num_down)*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_same+num_down) + elif symbol == '=': + if tb == 'tb': + tdSql.checkRows(num_same*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_same) + elif symbol == '>=': + if tb == 'tb': + tdSql.checkRows((num_up + num_same)*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_up + num_same) + elif symbol == '>': + if tb == 'tb': + tdSql.checkRows(num_up*tb_num) + elif tb == 'stb': + tdSql.checkRows(num_up) + tdSql.query(f"select today()/0 from {tbname}") + tdSql.checkRows(len(values_list)*tb_num) + tdSql.checkData(0,0,None) + tdSql.query(f"select {k} from {tbname} where {k}=today()") + if tb == 'tb': + tdSql.checkRows(num_same*tb_num) + for i in range(num_same*tb_num): + tdSql.checkData(i, 0, str(self.today_date)) + elif tb == 'stb': + tdSql.checkRows(num_same) + for i in range(num_same): + tdSql.checkData(i, 0, str(self.today_date)) + def today_check_ntb(self): + tdSql.prepare() + tdSql.execute(self.set_create_normaltable_sql(self.ntbname,self.column_dict)) + for i in self.values_list: + tdSql.execute( + f'insert into {self.ntbname} values({i})') + self.data_check(self.column_dict,self.ntbname,self.values_list) + tdSql.execute('drop database db') + def today_check_stb_tb(self): + tdSql.prepare() + tdSql.execute(self.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags({self.tag_values[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values ({j})') + # check child table + for i in range(self.tbnum): + self.data_check(self.column_dict,f'{self.stbname}_{i}',self.values_list) + # check stable + self.data_check(self.column_dict,self.stbname,self.values_list,self.tbnum,'stb') + tdSql.execute('drop database db') - tdLog.printNoPrefix("==========step1:create tables==========") - tdSql.execute( - '''create table if not exists ntb - (ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) - ''' - ) - tdSql.execute( - '''create table if not exists stb - (ts timestamp, c1 int, c2 float,c3 double,c4 timestamp) tags(t0 int) - ''' - ) - tdSql.execute( - '''create table if not exists stb_1 using stb tags(100) - ''' - ) - tdLog.printNoPrefix("==========step2:insert data into ntb==========") - tdSql.execute( - 'insert into ntb values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())') - tdSql.execute( - 'insert into stb_1 values(now,1,1.55,100.555555,today())("2020-1-1 00:00:00",10,11.11,99.999999,now())(today(),3,3.333,333.333333,now())') - tdLog.printNoPrefix("==========step2:query test of ntb ==========") - - # test function today() - # normal table - tdSql.query("select today() from ntb") - tdSql.checkRows(3) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() from db.ntb") - tdSql.checkRows(3) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() +1w from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1w from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1d from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1d from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1h from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1h from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1m from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1m from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1s from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1s from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1a from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1a from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1u from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1u from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1b from ntb") - tdSql.checkRows(3) - tdSql.query("select today() +1b from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1w from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1w from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1d from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1d from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1h from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1h from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1m from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1m from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1s from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1s from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1a from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1a from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1u from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1u from db.ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1b from ntb") - tdSql.checkRows(3) - tdSql.query("select today() -1b from db.ntb") - tdSql.checkRows(3) - tdSql.query("select * from ntb where ts=today()") - tdSql.checkRows(1) - tdSql.checkData(0, 1, 3) - tdSql.query("select * from ntb where ts<=today()") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 10) - # for bug - # tdSql.query("select * from ntb where ts=today()") - tdSql.checkRows(2) - tdSql.checkData(0, 1, 3) - # tdSql.query("select * from ntb where ts>today()") - # tdSql.checkRows(1) - tdSql.query("select c4 from ntb where c4=today()") - tdSql.checkRows(1) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() from ntb where ts=today()") - # tdSql.checkRows(2) - # tdSql.query("select * from ntb where ts>today()") - # tdSql.checkRows(1) - tdSql.query("select c4 from stb where c4=today()") - tdSql.checkRows(1) - tdSql.checkData(0, 0, str(today_date)) - tdSql.query("select today() from stb where ts=today()") - tdSql.checkRows(2) - tdSql.query("select * from ntb where ts>today()") - tdSql.checkRows(1) - tdSql.query("select c4 from stb_1 where c4=today()") - tdSql.checkRows(1) - tdSql.query("select today() from stb_1 where ts 2022-01-01 00:00:00.000 + 'event':'', + 'columnDict': {'int':2}, + 'tagDict': {'int':1} + } + + cdbName = 'cdb' + # some parameter to consumer processor + consumerId = 0 + expectrowcnt = 0 + topicList = '' + ifcheckdata = 0 + ifManualCommit = 1 + groupId = 'group.id:cgrp1' + autoCommit = 'enable.auto.commit:false' + autoCommitInterval = 'auto.commit.interval.ms:1000' + autoOffset = 'auto.offset.reset:earliest' + + pollDelay = 20 + showMsg = 1 + showRow = 1 + + hostname = socket.gethostname() + + def init(self, conn, logSql): + tdLog.debug(f"start to excute {__file__}") + logSql = False + tdSql.init(conn.cursor(), logSql) + + def tmqCase12(self): + tdLog.printNoPrefix("======== test case 12: ") + tdLog.info("step 1: create database, stb, ctb and insert data") + + tmqCom.initConsumerTable(self.cdbName) + + tdCom.create_database(tdSql,self.paraDict["dbName"],self.paraDict["dropFlag"], self.paraDict['precision']) + + self.paraDict["stbName"] = 'stb1' + tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) + tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) + tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) + + self.paraDict["stbName"] = 'stb2' + tdCom.create_stable(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["columnDict"],self.paraDict["tagDict"]) + tdCom.create_ctables(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["tagDict"]) + tdCom.insert_data(tdSql,self.paraDict["dbName"],self.paraDict["stbName"],self.paraDict["ctbNum"],self.paraDict["rowsPerTbl"],self.paraDict["batchNum"]) + + tdLog.info("create topics from db") + topicName1 = 'topic_%s'%(self.paraDict['dbName']) + tdSql.execute("create topic %s as database %s" %(topicName1, self.paraDict['dbName'])) + + topicList = topicName1 + keyList = '%s,%s,%s,%s'%(self.groupId,self.autoCommit,self.autoCommitInterval,self.autoOffset) + self.expectrowcnt = self.paraDict["rowsPerTbl"] * self.paraDict["ctbNum"] * 2 + tmqCom.insertConsumerInfo(self.consumerId, self.expectrowcnt,topicList,keyList,self.ifcheckdata,self.ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName) + + tdLog.info("After waiting for a period of time, drop one stable") + time.sleep(10) + tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName'])) + + tdLog.info("wait result from consumer, then check it") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if not (totalConsumeRows >= self.expectrowcnt/2 and totalConsumeRows <= self.expectrowcnt): + tdLog.info("act consume rows: %d, expect consume rows: between %d and %d"%(totalConsumeRows, self.expectrowcnt/2, self.expectrowcnt)) + tdLog.exit("tmq consume rows error!") + + time.sleep(15) + tdSql.query("drop topic %s"%topicName1) + + tdLog.printNoPrefix("======== test case 12 end ...... ") + + def run(self): + tdSql.prepare() + self.tmqCase12() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/subscribeStb4.py b/tests/system-test/7-tmq/subscribeStb4.py index d8a9ca95b086372fe581176f5dc7e220d1a71a98..f3982c8f1f7e2dcdd28f98100ed3817013901b74 100644 --- a/tests/system-test/7-tmq/subscribeStb4.py +++ b/tests/system-test/7-tmq/subscribeStb4.py @@ -196,16 +196,16 @@ class TDTestCase: auotCtbPrefix = 'autoCtb' # create and start thread - parameterDict = {'cfg': '', \ - 'actionType': 0, \ - 'dbName': 'db1', \ - 'dropFlag': 1, \ - 'vgroups': 4, \ - 'replica': 1, \ - 'stbName': 'stb1', \ - 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ - 'batchNum': 100, \ + parameterDict = {'cfg': '', + 'actionType': 0, + 'dbName': 'db1', + 'dropFlag': 1, + 'vgroups': 4, + 'replica': 1, + 'stbName': 'stb1', + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 100, 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath @@ -349,7 +349,5 @@ class TDTestCase: tdSql.close() tdLog.success(f"{__file__} successfully executed") -event = threading.Event() - tdCases.addLinux(__file__, TDTestCase()) tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/7-tmq/tmqCommon.py b/tests/system-test/7-tmq/tmqCommon.py new file mode 100644 index 0000000000000000000000000000000000000000..68adaa412603e71413152257ad7d005d4017c753 --- /dev/null +++ b/tests/system-test/7-tmq/tmqCommon.py @@ -0,0 +1,118 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from collections import defaultdict +import random +import string +import threading +import requests +import time +# import socketfrom + +import taos +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * + +# class actionType(Enum): +# CREATE_DATABASE = 0 +# CREATE_STABLE = 1 +# CREATE_CTABLE = 2 +# INSERT_DATA = 3 + +class TMQCom: + def init(self, conn, logSql): + tdSql.init(conn.cursor()) + # tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1"%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + def initConsumerInfoTable(self,cdbName='cdb'): + tdLog.info("drop consumeinfo table") + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): + buildPath = tdCom.getBuildPath() + cfgPath = tdCom.getClientCfgPath() + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def syncCreateDbStbCtbInsertData(self, tsql, paraDict): + tdCom.create_database(tsql, paraDict["dbName"],paraDict["dropFlag"], paraDict['precision']) + tdCom.create_stable(tsql, paraDict["dbName"],paraDict["stbName"], paraDict["columnDict"], paraDict["tagDict"]) + tdCom.create_ctables(tsql, paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["tagDict"]) + if "event" in paraDict and type(paraDict['event']) == type(threading.Event()): + paraDict["event"].set() + tdCom.insert_data(tsql,paraDict["dbName"],paraDict["stbName"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) + return + + def threadFunction(self, **paraDict): + # create new connector for new tdSql instance in my thread + newTdSql = tdCom.newTdSql() + self.syncCreateDbStbCtbInsertData(self, newTdSql, paraDict) + return + + def asyncCreateDbStbCtbInsertData(self, paraDict): + pThread = threading.Thread(target=self.threadFunction, kwargs=paraDict) + pThread.start() + return pThread + + def close(self): + self.cursor.close() + +tmqCom = TMQCom()