diff --git a/cmake/cmake.define b/cmake/cmake.define index c9a188600a16867ba141b88bc82b5251f48efab7..fb6ba8cc2e2703d17e45593a925fffdcbcb5bbb8 100644 --- a/cmake/cmake.define +++ b/cmake/cmake.define @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.16) -set(CMAKE_VERBOSE_MAKEFILE ON) +set(CMAKE_VERBOSE_MAKEFILE OFF) #set output directory SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index b4a290cbfca7d6c4b7d26d4db9c8dea752159326..829770ed1b089e2e2e9da6a6b18f0d0b1a54c631 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -46,7 +46,7 @@ typedef struct SScanLogicNode { struct STableMeta* pMeta; SVgroupsInfo* pVgroupList; EScanType scanType; - uint8_t scanFlag; // denotes reversed scan of data or not + uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count STimeWindow scanRange; SName tableName; bool showRewrite; @@ -189,9 +189,6 @@ typedef struct SScanPhysiNode { SNodeList* pScanCols; uint64_t uid; // unique id of the table int8_t tableType; - int32_t order; // scan order: TSDB_ORDER_ASC|TSDB_ORDER_DESC - int32_t count; // repeat count - int32_t reverse; // reverse scan count SName tableName; } SScanPhysiNode; @@ -207,7 +204,7 @@ typedef struct SSystemTableScanPhysiNode { typedef struct STableScanPhysiNode { SScanPhysiNode scan; - uint8_t scanFlag; // denotes reversed scan of data or not + uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count STimeWindow scanRange; double ratio; int32_t dataRequired; diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 2f6b9e1866a6aadda2009a727ace22c0c5bcdc0e..78d09115023490c35249129767a878ec1fc539e9 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -37,6 +37,8 @@ typedef struct SPlanContext { bool isStmtQuery; void* pTransporter; struct SCatalog* pCatalog; + char* pMsg; + int32_t msgLen; } SPlanContext; // Create the physical plan for the query, according to the AST. diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 03fb43a46b5314e0b118675b16131031c23622aa..3ac440346c503427767fa5e0a2a60afd24e41601 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -623,6 +623,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_INVALID_DAYS_VALUE TAOS_DEF_ERROR_CODE(0, 0x2636) #define TSDB_CODE_PAR_OFFSET_LESS_ZERO TAOS_DEF_ERROR_CODE(0, 0x2637) #define TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY TAOS_DEF_ERROR_CODE(0, 0x2638) +#define TSDB_CODE_PAR_INVALID_TOPIC_QUERY TAOS_DEF_ERROR_CODE(0, 0x2639) //planner #define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index fc4192d818dca7f46ac43acc2b2014247f44549a..367baef53f0d8ca24a7da4a23952e0f7bd5b633d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -232,7 +232,9 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra .mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp), .pAstRoot = pQuery->pRoot, .showRewrite = pQuery->showRewrite, - .pTransporter = pRequest->pTscObj->pAppInfo->pTransporter + .pTransporter = pRequest->pTscObj->pAppInfo->pTransporter, + .pMsg = pRequest->msgBuf, + .msgLen = ERROR_MSG_BUF_DEFAULT_SIZE }; int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &cxt.pCatalog); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index af4b9d389e10aa116e970f387c59e7b6a0c78d50..e76d7fcd4f0393601988f0060ddf0766e928e33e 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -146,10 +146,10 @@ int32_t dmProcessCreateNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMs dError("node:%s, failed to create since %s", pWrapper->name, terrstr()); } else { dDebug("node:%s, has been created", pWrapper->name); + (void)dmOpenNode(pWrapper); pWrapper->required = true; pWrapper->deployed = true; pWrapper->procType = pDnode->ptype; - (void)dmOpenNode(pWrapper); } taosThreadMutexUnlock(&pDnode->mutex); @@ -171,13 +171,13 @@ int32_t dmProcessDropNodeReq(SDnode *pDnode, EDndNodeType ntype, SNodeMsg *pMsg) dError("node:%s, failed to drop since %s", pWrapper->name, terrstr()); } else { dDebug("node:%s, has been dropped", pWrapper->name); + pWrapper->required = false; + pWrapper->deployed = false; } dmReleaseWrapper(pWrapper); if (code == 0) { - pWrapper->required = false; - pWrapper->deployed = false; dmCloseNode(pWrapper); taosRemoveDir(pWrapper->path); } diff --git a/source/dnode/mgmt/implement/src/dmTransport.c b/source/dnode/mgmt/implement/src/dmTransport.c index 313d1c008ab409ad26b4f010ded4f51b7dddc969..fb8e5e7fb23d39353eb317da9dec72416da8b01e 100644 --- a/source/dnode/mgmt/implement/src/dmTransport.c +++ b/source/dnode/mgmt/implement/src/dmTransport.c @@ -71,12 +71,15 @@ static void dmProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSe SNodeMsg *pMsg = NULL; NodeMsgFp msgFp = NULL; uint16_t msgType = pRpc->msgType; + bool needRelease = false; if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) { dmSetMnodeEpSet(pWrapper->pDnode, pEpSet); } if (dmMarkWrapper(pWrapper) != 0) goto _OVER; + + needRelease = true; if ((msgFp = dmGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER; if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER; if (dmBuildMsg(pMsg, pRpc) != 0) goto _OVER; @@ -119,7 +122,9 @@ _OVER: rpcFreeCont(pRpc->pCont); } - dmReleaseWrapper(pWrapper); + if (needRelease) { + dmReleaseWrapper(pWrapper); + } } static void dmProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 273b7447ffe781075a3956a83237e9f563eb88e4..bc9fe46c0f34f1b9b293af212c221064752b7602 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -1510,7 +1510,6 @@ static int32_t tsdbGetTSmaDataImpl(STsdb *pTsdb, char *pData, int64_t indexUid, } STSma *pTSma = pItem->pSma; - #endif STSmaReadH tReadH = {0}; diff --git a/source/libs/command/src/explain.c b/source/libs/command/src/explain.c index 4853bb4eb387750f8cd7fe168507acff0b805b47..75084e76106ac69494605dd434370c7ef59a8d79 100644 --- a/source/libs/command/src/explain.c +++ b/source/libs/command/src/explain.c @@ -344,11 +344,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTagScanNode->count); - if (pTagScanNode->reverse) { - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTagScanNode->reverse); - } EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); @@ -361,10 +356,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTagScanNode->order)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - if (pResNode->pExecInfo) { QRY_ERR_RET(qExplainBufAppendVerboseExecInfo(pResNode->pExecInfo, tbuf, &tlen)); if (tlen) { @@ -388,11 +379,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pTblScanNode->scan.count); - if (pTblScanNode->scan.reverse) { - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pTblScanNode->scan.reverse); - } EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); @@ -405,10 +391,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pTblScanNode->scan.order)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_TIMERANGE_FORMAT, pTblScanNode->scanRange.skey, pTblScanNode->scanRange.ekey); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); @@ -434,11 +416,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pSTblScanNode->scan.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_LOOPS_FORMAT, pSTblScanNode->scan.count); - if (pSTblScanNode->scan.reverse) { - EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); - EXPLAIN_ROW_APPEND(EXPLAIN_REVERSE_FORMAT, pSTblScanNode->scan.reverse); - } EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); @@ -451,10 +428,6 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i EXPLAIN_ROW_END(); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - EXPLAIN_ROW_NEW(level + 1, EXPLAIN_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSTblScanNode->scan.order)); - EXPLAIN_ROW_END(); - QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1)); - if (pSTblScanNode->scan.node.pConditions) { EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT); QRY_ERR_RET(nodesNodeToSQL(pSTblScanNode->scan.node.pConditions, tbuf + VARSTR_HEADER_SIZE, TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen)); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index db090e6464194920f2a1b2c9563868f58cad8fef..32c3a60b0fa6f1b2e023246adddeb7465fcc8221 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1121,6 +1121,10 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt // todo avoid case: top(k, 12), 12 is the value parameter. // sum(11), 11 is also the value parameter. if (createDummyCol && pOneExpr->base.numOfParams == 1) { + pInput->totalRows = pBlock->info.rows; + pInput->numOfRows = pBlock->info.rows; + pInput->startRowIndex = 0; + code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); if (code != TSDB_CODE_SUCCESS) { return code; @@ -6571,9 +6575,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo .offset = pTableScanNode->offset, }; - return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pTableScanNode->dataRequired, - pScanPhyNode->count, pScanPhyNode->reverse, pColList, pResBlock, - pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); + return createTableScanOperatorInfo(pDataReader, pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC, + numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq[0], pTableScanNode->scanSeq[1], pColList, + pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc); @@ -6721,7 +6725,7 @@ static tsdbReaderT createDataReaderImpl(STableScanPhysiNode* pTableScanNode, STa void* readHandle, uint64_t queryId, uint64_t taskId) { STsdbQueryCond cond = {.loadExternalRows = false}; - cond.order = pTableScanNode->scan.order; + cond.order = pTableScanNode->scanSeq[0] > 0 ? TSDB_ORDER_ASC : TSDB_ORDER_DESC; cond.numOfCols = LIST_LENGTH(pTableScanNode->scan.pScanCols); cond.colList = taosMemoryCalloc(cond.numOfCols, sizeof(SColumnInfo)); if (cond.colList == NULL) { diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index db92740fff260f07dbc438166b7d04682bbea136..40393a4eabb83c37bc555da0eddf35e0d5c2233e 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -40,6 +40,11 @@ bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t minFunction(SqlFunctionCtx* pCtx); int32_t maxFunction(SqlFunctionCtx *pCtx); +bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +int32_t avgFunction(SqlFunctionCtx* pCtx); +int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId); + bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); int32_t stddevFunction(SqlFunctionCtx* pCtx); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 3da5dc3289e246d6202c984bbc64f40eecde2832..70087ee46bf3b2f49a8876372c1df75c8cdb582a 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -104,7 +104,7 @@ static int32_t translateCount(SFunctionNode* pFunc, char* pErrBuf, int32_t len) if (1 != LIST_LENGTH(pFunc->pParameterList)) { return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName); } - pFunc->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT}; + pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT}; return TSDB_CODE_SUCCESS; } @@ -479,6 +479,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { .processFunc = stddevFunction, .finalizeFunc = stddevFinalize }, + { + .name = "avg", + .type = FUNCTION_TYPE_AVG, + .classification = FUNC_MGT_AGG_FUNC, + .translateFunc = translateInNumOutDou, + .getEnvFunc = getAvgFuncEnv, + .initFunc = avgFunctionSetup, + .processFunc = avgFunction, + .finalizeFunc = avgFinalize + }, { .name = "percentile", .type = FUNCTION_TYPE_PERCENTILE, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3dc2e62e92e4b8e28d4178032b4aaed81d050a8a..63af2fb97225014483a328b9e700eaf58ef20683 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -20,6 +20,59 @@ #include "tdatablock.h" #include "tpercentile.h" +typedef struct SSumRes { + union { + int64_t isum; + uint64_t usum; + double dsum; + }; +} SSumRes; + +typedef struct SAvgRes { + double result; + SSumRes sum; + int64_t count; +} SAvgRes; + +typedef struct STopBotResItem { + SVariant v; + uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data + struct { + int32_t pageId; + int32_t offset; + } tuplePos; // tuple data of this chosen row +} STopBotResItem; + +typedef struct STopBotRes { + int32_t pageId; +// int32_t num; + STopBotResItem *pItems; +} STopBotRes; + +typedef struct SStddevRes { + double result; + int64_t count; + union {double quadraticDSum; int64_t quadraticISum;}; + union {double dsum; int64_t isum;}; +} SStddevRes; + +typedef struct SPercentileInfo { + double result; + tMemBucket *pMemBucket; + int32_t stage; + double minval; + double maxval; + int64_t numOfElems; +} SPercentileInfo; + +typedef struct SDiffInfo { + bool hasPrev; + bool includeNull; + bool ignoreNegative; + bool firstOutput; + union { int64_t i64; double d64;} prev; +} SDiffInfo; + #define SET_VAL(_info, numOfElem, res) \ do { \ if ((numOfElem) <= 0) { \ @@ -28,13 +81,50 @@ (_info)->numOfRes = (res); \ } while (0) -typedef struct SSumRes { - union { - int64_t isum; - uint64_t usum; - double dsum; - }; -} SSumRes; +#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList)) +#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)]) + +#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ + do { \ + for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ + SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ + __ctx->fpSet.process(__ctx); \ + } \ + } while (0); + +#define DO_UPDATE_SUBSID_RES(ctx, ts) \ + do { \ + for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \ + SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ + if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ + __ctx->tag.i = (ts); \ + __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ + } \ + __ctx->fpSet.process(__ctx); \ + } \ + } while (0) + +#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ + do { \ + if (((left) < (right)) ^ (sign)) { \ + (left) = (right); \ + DO_UPDATE_SUBSID_RES(ctx, _ts); \ + (num) += 1; \ + } \ + } while (0) + +#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ + do { \ + _t *d = (_t *)((_col)->pData); \ + for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \ + if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ + continue; \ + } \ + TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \ + UPDATE_DATA(ctx, val, d[i], num, sign, ts); \ + } \ + } while (0) + bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { if (pResultInfo->initialized) { @@ -135,7 +225,7 @@ int32_t sumFunction(SqlFunctionCtx *pCtx) { int32_t type = pInput->pData[0]->info.type; SSumRes* pSumRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - + if (pInput->colDataAggIsSet) { numOfElem = pInput->numOfRows - pAgg->numOfNull; ASSERT(numOfElem >= 0); @@ -190,6 +280,145 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } +bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { + pEnv->calcMemSize = sizeof(double); + return true; +} + +bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo) { + if (!functionSetup(pCtx, pResultInfo)) { + return false; + } + + SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo); + memset(pRes, 0, sizeof(SAvgRes)); + return true; +} + +int32_t avgFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElem = 0; + + // Only the pre-computing information loaded and actual data does not loaded + SInputColumnInfoData* pInput = &pCtx->input; + int32_t type = pInput->pData[0]->info.type; + + SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + + // computing based on the true data block + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; + + switch (type) { + case TSDB_DATA_TYPE_TINYINT: { + int8_t* plist = (int8_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; + } + + break; + } + + case TSDB_DATA_TYPE_SMALLINT: { + int16_t* plist = (int16_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; + } + break; + } + + case TSDB_DATA_TYPE_INT: { + int32_t* plist = (int32_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; + } + + break; + } + + case TSDB_DATA_TYPE_BIGINT: { + int64_t* plist = (int64_t*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.isum += plist[i]; + } + break; + } + + case TSDB_DATA_TYPE_FLOAT: { + float* plist = (float*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.dsum += plist[i]; + } + break; + } + + case TSDB_DATA_TYPE_DOUBLE: { + double* plist = (double*)pCol->pData; + for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElem += 1; + pAvgRes->count += 1; + pAvgRes->sum.dsum += plist[i]; + } + break; + } + + default: + break; + } + + // data in the check operation are all null, not output + SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); + return TSDB_CODE_SUCCESS; +} + +int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { + SInputColumnInfoData* pInput = &pCtx->input; + int32_t type = pInput->pData[0]->info.type; + SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + if (IS_INTEGER_TYPE(type)) { + pAvgRes->result = pAvgRes->sum.isum / ((double) pAvgRes->count); + } else { + pAvgRes->result = pAvgRes->sum.dsum / ((double) pAvgRes->count); + } + + return functionFinalize(pCtx, pBlock, slotId); +} + EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow){ return FUNC_DATA_REQUIRED_STATIS_LOAD; } @@ -292,49 +521,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -#define GET_TS_LIST(x) ((TSKEY*)((x)->ptsList)) -#define GET_TS_DATA(x, y) (GET_TS_LIST(x)[(y)]) - -#define DO_UPDATE_TAG_COLUMNS_WITHOUT_TS(ctx) \ - do { \ - for (int32_t _i = 0; _i < (ctx)->tagInfo.numOfTagCols; ++_i) { \ - SqlFunctionCtx *__ctx = (ctx)->tagInfo.pTagCtxList[_i]; \ - __ctx->fpSet.process(__ctx); \ - } \ - } while (0); - -#define DO_UPDATE_SUBSID_RES(ctx, ts) \ - do { \ - for (int32_t _i = 0; _i < (ctx)->subsidiaryRes.numOfCols; ++_i) { \ - SqlFunctionCtx *__ctx = (ctx)->subsidiaryRes.pCtx[_i]; \ - if (__ctx->functionId == FUNCTION_TS_DUMMY) { \ - __ctx->tag.i = (ts); \ - __ctx->tag.nType = TSDB_DATA_TYPE_BIGINT; \ - } \ - __ctx->fpSet.process(__ctx); \ - } \ - } while (0) - -#define UPDATE_DATA(ctx, left, right, num, sign, _ts) \ - do { \ - if (((left) < (right)) ^ (sign)) { \ - (left) = (right); \ - DO_UPDATE_SUBSID_RES(ctx, _ts); \ - (num) += 1; \ - } \ - } while (0) - -#define LOOPCHECK_N(val, _col, ctx, _t, _nrow, _start, sign, num) \ - do { \ - _t *d = (_t *)((_col)->pData); \ - for (int32_t i = (_start); i < (_nrow) + (_start); ++i) { \ - if (((_col)->hasNull) && colDataIsNull_f((_col)->nullbitmap, i)) { \ - continue; \ - } \ - TSKEY ts = (ctx)->ptsList != NULL ? GET_TS_DATA(ctx, i) : 0; \ - UPDATE_DATA(ctx, val, d[i], num, sign, ts); \ - } \ - } while (0) int32_t doMinMaxHelper(SqlFunctionCtx *pCtx, int32_t isMinFunc) { int32_t numOfElems = 0; @@ -479,13 +665,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } -typedef struct SStddevRes { - double result; - int64_t count; - union {double quadraticDSum; int64_t quadraticISum;}; - union {double dsum; int64_t isum;}; -} SStddevRes; - bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SStddevRes); return true; @@ -588,8 +767,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pStddevRes->count += 1; - pStddevRes->isum += plist[i]; - pStddevRes->quadraticISum += plist[i] * plist[i]; + pStddevRes->dsum += plist[i]; + pStddevRes->quadraticDSum += plist[i] * plist[i]; } break; } @@ -603,8 +782,8 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { numOfElem += 1; pStddevRes->count += 1; - pStddevRes->isum += plist[i]; - pStddevRes->quadraticISum += plist[i] * plist[i]; + pStddevRes->dsum += plist[i]; + pStddevRes->quadraticDSum += plist[i] * plist[i]; } break; } @@ -619,21 +798,21 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { } int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId) { + SInputColumnInfoData* pInput = &pCtx->input; + int32_t type = pInput->pData[0]->info.type; SStddevRes* pStddevRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - double avg = pStddevRes->isum / ((double) pStddevRes->count); - pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg); + double avg; + if (IS_INTEGER_TYPE(type)) { + avg = pStddevRes->isum / ((double) pStddevRes->count); + pStddevRes->result = sqrt(pStddevRes->quadraticISum/((double)pStddevRes->count) - avg*avg); + } else { + avg = pStddevRes->dsum / ((double) pStddevRes->count); + pStddevRes->result = sqrt(pStddevRes->quadraticDSum/((double)pStddevRes->count) - avg*avg); + } + return functionFinalize(pCtx, pBlock, slotId); } -typedef struct SPercentileInfo { - double result; - tMemBucket *pMemBucket; - int32_t stage; - double minval; - double maxval; - int64_t numOfElems; -} SPercentileInfo; - bool getPercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SPercentileInfo); return true; @@ -928,14 +1107,6 @@ int32_t lastFunction(SqlFunctionCtx *pCtx) { return TSDB_CODE_SUCCESS; } -typedef struct SDiffInfo { - bool hasPrev; - bool includeNull; - bool ignoreNegative; - bool firstOutput; - union { int64_t i64; double d64;} prev; -} SDiffInfo; - bool getDiffFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { pEnv->calcMemSize = sizeof(SDiffInfo); return true; @@ -1168,21 +1339,6 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { } } -typedef struct STopBotResItem { - SVariant v; - uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data - struct { - int32_t pageId; - int32_t offset; - } tuplePos; // tuple data of this chosen row -} STopBotResItem; - -typedef struct STopBotRes { - int32_t pageId; -// int32_t num; - STopBotResItem *pItems; -} STopBotRes; - bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1); pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * sizeof(STopBotResItem); @@ -1335,4 +1491,4 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t slotId return pEntryInfo->numOfRes; // return functionFinalize(pCtx, pBlock, slotId); -} \ No newline at end of file +} diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 676304f9a809454af6c1abe423326fd6b5ec8abb..046a5368d967b17902b6aba5d3623992097e9cb9 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -671,9 +671,6 @@ static int32_t jsonToName(const SJson* pJson, void* pObj) { static const char* jkScanPhysiPlanScanCols = "ScanCols"; static const char* jkScanPhysiPlanTableId = "TableId"; static const char* jkScanPhysiPlanTableType = "TableType"; -static const char* jkScanPhysiPlanScanOrder = "ScanOrder"; -static const char* jkScanPhysiPlanScanCount = "ScanCount"; -static const char* jkScanPhysiPlanReverseScanCount = "ReverseScanCount"; static const char* jkScanPhysiPlanTableName = "TableName"; static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { @@ -689,15 +686,6 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType); } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanOrder, pNode->order); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanScanCount, pNode->count); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanReverseScanCount, pNode->reverse); - } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName); } @@ -718,15 +706,6 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType); } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetIntValue(pJson, jkScanPhysiPlanScanOrder, &pNode->order); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetIntValue(pJson, jkScanPhysiPlanScanCount, &pNode->count); - } - if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetIntValue(pJson, jkScanPhysiPlanReverseScanCount, &pNode->reverse); - } if (TSDB_CODE_SUCCESS == code) { code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName); } @@ -742,7 +721,8 @@ static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) { return jsonToPhysiScanNode(pJson, pObj); } -static const char* jkTableScanPhysiPlanScanFlag = "ScanFlag"; +static const char* jkTableScanPhysiPlanScanCount = "ScanCount"; +static const char* jkTableScanPhysiPlanReverseScanCount = "ReverseScanCount"; static const char* jkTableScanPhysiPlanStartKey = "StartKey"; static const char* jkTableScanPhysiPlanEndKey = "EndKey"; static const char* jkTableScanPhysiPlanRatio = "Ratio"; @@ -759,7 +739,10 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { int32_t code = physiScanNodeToJson(pObj, pJson); if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanFlag, pNode->scanFlag); + code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanScanCount, pNode->scanSeq[0]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanReverseScanCount, pNode->scanSeq[1]); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanStartKey, pNode->scanRange.skey); @@ -800,7 +783,10 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { int32_t code = jsonToPhysiScanNode(pJson, pObj); if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanScanFlag, &pNode->scanFlag); + code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanScanCount, &pNode->scanSeq[0]); + } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetUTinyIntValue(pJson, jkTableScanPhysiPlanReverseScanCount, &pNode->scanSeq[1]); } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBigIntValue(pJson, jkTableScanPhysiPlanStartKey, &pNode->scanRange.skey); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index b1cf5759b72010f99b348f82e5f80d642d7c6e14..20e470bd1dbe1fcba1e72fc50d9f76025a4fad80 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -611,6 +611,7 @@ function_expression(A) ::= function_name(B) NK_LP expression_list(C) NK_RP(D). function_expression(A) ::= star_func(B) NK_LP star_func_para_list(C) NK_RP(D). { A = createRawExprNodeExt(pCxt, &B, &D, createFunctionNode(pCxt, &B, C)); } function_expression(A) ::= CAST(B) NK_LP expression(C) AS type_name(D) NK_RP(E). { A = createRawExprNodeExt(pCxt, &B, &E, createCastFunctionNode(pCxt, releaseRawExprNode(pCxt, C), D)); } function_expression(A) ::= noarg_func(B) NK_LP NK_RP(C). { A = createRawExprNodeExt(pCxt, &B, &C, createFunctionNodeNoArg(pCxt, &B)); } +//function_expression(A) ::= NOW(B). { A = createRawExprNode(pCxt, &B, createFunctionNode(pCxt, &B, NULL)); } %type noarg_func { SToken } %destructor noarg_func { } diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index ef1f9ada011dfa18285c8b5660cb79c0020a8d0a..51e0daf4adfeeb273112d3e88bcd31d43daf1408 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -83,7 +83,7 @@ static EDealRes calcConstOperator(SOperatorNode** pNode, void* pContext) { static EDealRes calcConstFunction(SFunctionNode** pNode, void* pContext) { SFunctionNode* pFunc = *pNode; - if (!fmIsScalarFunc(pFunc->funcId)) { + if (!fmIsScalarFunc(pFunc->funcId) || fmIsUserDefinedFunc(pFunc->funcId)) { return DEAL_RES_CONTINUE; } SNode* pParam = NULL; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index fecfb74947dd95d64ae4787a3bc50c854f655db8..a3f8c3aed5eed8de0732ac7a9a1d8dc827af5a6e 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -2616,37 +2616,62 @@ static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponen (FSerializeFunc)tSerializeSCreateDropMQSBNodeReq, &dropReq); } -static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) { - SCMCreateTopicReq createReq = {0}; +static int32_t buildCreateTopicReq(STranslateContext* pCxt, SCreateTopicStmt* pStmt, SCMCreateTopicReq* pReq) { + SName name; + // tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->topicName, strlen(pStmt->topicName)); + // tNameGetFullDbName(&name, pReq->name); + tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), pReq->name); + pReq->igExists = pStmt->ignoreExists; + pReq->withTbName = pStmt->pOptions->withTable; + pReq->withSchema = pStmt->pOptions->withSchema; + pReq->withTag = pStmt->pOptions->withTag; + + pReq->sql = strdup(pCxt->pParseCxt->pSql); + if (NULL == pReq->sql) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t code = TSDB_CODE_SUCCESS; if (NULL != pStmt->pQuery) { + strcpy(pReq->subscribeDbName, ((SRealTableNode*)(((SSelectStmt*)pStmt->pQuery)->pFromTable))->table.dbName); pCxt->pParseCxt->topicQuery = true; - int32_t code = translateQuery(pCxt, pStmt->pQuery); + code = translateQuery(pCxt, pStmt->pQuery); if (TSDB_CODE_SUCCESS == code) { - code = nodesNodeToString(pStmt->pQuery, false, &createReq.ast, NULL); - } - if (TSDB_CODE_SUCCESS != code) { - return code; + code = nodesNodeToString(pStmt->pQuery, false, &pReq->ast, NULL); } } else { - strcpy(createReq.subscribeDbName, pStmt->subscribeDbName); + strcpy(pReq->subscribeDbName, pStmt->subscribeDbName); } - createReq.sql = strdup(pCxt->pParseCxt->pSql); - if (NULL == createReq.sql) { - return TSDB_CODE_OUT_OF_MEMORY; + return code; +} + +static int32_t checkCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) { + if (NULL == pStmt->pQuery) { + return TSDB_CODE_SUCCESS; } - SName name; - // tNameSetDbName(&name, pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, strlen(pCxt->pParseCxt->db)); - // tNameGetFullDbName(&name, createReq.name); - tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pCxt->pParseCxt->db, pStmt->topicName, &name), createReq.name); - createReq.igExists = pStmt->ignoreExists; - createReq.withTbName = pStmt->pOptions->withTable; - createReq.withSchema = pStmt->pOptions->withSchema; - createReq.withTag = pStmt->pOptions->withTag; + if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) { + SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; + if (!pSelect->isDistinct && QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable) && NULL == pSelect->pGroupByList && + NULL == pSelect->pLimit && NULL == pSelect->pSlimit && NULL == pSelect->pOrderByList && NULL == pSelect->pPartitionByList) { + return TSDB_CODE_SUCCESS; + } + } - int32_t code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq); + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TOPIC_QUERY); +} + +static int32_t translateCreateTopic(STranslateContext* pCxt, SCreateTopicStmt* pStmt) { + SCMCreateTopicReq createReq = {0}; + int32_t code = checkCreateTopic(pCxt, pStmt); + if (TSDB_CODE_SUCCESS == code) { + code = buildCreateTopicReq(pCxt, pStmt, &createReq); + } + if (TSDB_CODE_SUCCESS == code) { + code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TOPIC, (FSerializeFunc)tSerializeSCMCreateTopicReq, &createReq); + } tFreeSCMCreateTopicReq(&createReq); return code; } diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 7e41bbe3fd5a54358c379178422e19be3c03a61d..fd700f8d461ff2dd38541d40856102c82acd87a1 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -128,6 +128,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "soffset/offset can not be less than 0"; case TSDB_CODE_PAR_SLIMIT_LEAK_PARTITION_BY: return "slimit/soffset only available for PARTITION BY query"; + case TSDB_CODE_PAR_INVALID_TOPIC_QUERY: + return "Invalid topic query"; case TSDB_CODE_OUT_OF_MEMORY: return "Out of memory"; default: diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 2c7ad27927978ba24b83d3d01807a82afaec68fc..ac5188170af6d148cf0956a670ffe0d92197f568 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -199,7 +199,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect TSWAP(pScan->pMeta, pRealTable->pMeta, STableMeta*); TSWAP(pScan->pVgroupList, pRealTable->pVgroupList, SVgroupsInfo*); - pScan->scanFlag = MAIN_SCAN; + pScan->scanSeq[0] = 1; + pScan->scanSeq[1] = 0; pScan->scanRange = TSWINDOW_INITIALIZER; pScan->tableName.type = TSDB_TABLE_NAME_T; pScan->tableName.acctId = pCxt->pPlanCxt->acctId; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 832135e90eaa9bdf90c15c48158f784fee3ba1ac..21e55fc34adb40066551008366ebfb067789b780 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -20,11 +20,14 @@ #define OPTIMIZE_FLAG_MASK(n) (1 << n) #define OPTIMIZE_FLAG_OSD OPTIMIZE_FLAG_MASK(0) +#define OPTIMIZE_FLAG_CPD OPTIMIZE_FLAG_MASK(1) +#define OPTIMIZE_FLAG_OPK OPTIMIZE_FLAG_MASK(2) #define OPTIMIZE_FLAG_SET_MASK(val, mask) (val) |= (mask) #define OPTIMIZE_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0) typedef struct SOptimizeContext { + SPlanContext* pPlanCxt; bool optimized; } SOptimizeContext; @@ -57,7 +60,23 @@ typedef enum ECondAction { // after supporting outer join, there are other possibilities } ECondAction; -EDealRes haveNormalColImpl(SNode* pNode, void* pContext) { +typedef bool (*FMayBeOptimized)(SLogicNode* pNode); + +static SLogicNode* optFindPossibleNode(SLogicNode* pNode, FMayBeOptimized func) { + if (func(pNode)) { + return pNode; + } + SNode* pChild; + FOREACH(pChild, pNode->pChildren) { + SLogicNode* pScanNode = optFindPossibleNode((SLogicNode*)pChild, func); + if (NULL != pScanNode) { + return pScanNode; + } + } + return NULL; +} + +EDealRes osdHaveNormalColImpl(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { *((bool*)pContext) = (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType); return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD; @@ -65,9 +84,9 @@ EDealRes haveNormalColImpl(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static bool haveNormalCol(SNodeList* pList) { +static bool osdHaveNormalCol(SNodeList* pList) { bool res = false; - nodesWalkExprsPostOrder(pList, haveNormalColImpl, &res); + nodesWalkExprsPostOrder(pList, osdHaveNormalColImpl, &res); return res; } @@ -89,21 +108,7 @@ static bool osdMayBeOptimized(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_WINDOW == nodeType(pNode->pParent)) { return (WINDOW_TYPE_INTERVAL == ((SWindowLogicNode*)pNode->pParent)->winType); } - return !haveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); -} - -static SLogicNode* osdFindPossibleScanNode(SLogicNode* pNode) { - if (osdMayBeOptimized(pNode)) { - return pNode; - } - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { - SLogicNode* pScanNode = osdFindPossibleScanNode((SLogicNode*)pChild); - if (NULL != pScanNode) { - return pScanNode; - } - } - return NULL; + return !osdHaveNormalCol(((SAggLogicNode*)pNode->pParent)->pGroupKeys); } static SNodeList* osdGetAllFuncs(SLogicNode* pNode) { @@ -138,7 +143,7 @@ static int32_t osdGetRelatedFuncs(SScanLogicNode* pScan, SNodeList** pSdrFuncs, } static int32_t osdMatch(SOptimizeContext* pCxt, SLogicNode* pLogicNode, SOsdInfo* pInfo) { - pInfo->pScan = (SScanLogicNode*)osdFindPossibleScanNode(pLogicNode); + pInfo->pScan = (SScanLogicNode*)optFindPossibleNode(pLogicNode, osdMayBeOptimized); if (NULL == pInfo->pScan) { return TSDB_CODE_SUCCESS; } @@ -345,7 +350,7 @@ static int32_t cpdCalcTimeRange(SScanLogicNode* pScan, SNode** pPrimaryKeyCond, } static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan) { - if (NULL == pScan->node.pConditions) { + if (NULL == pScan->node.pConditions || OPTIMIZE_FLAG_TEST_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) { return TSDB_CODE_SUCCESS; } @@ -359,7 +364,10 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* pScan->node.pConditions = pOtherCond; } - if (TSDB_CODE_SUCCESS != code) { + if (TSDB_CODE_SUCCESS == code) { + OPTIMIZE_FLAG_SET_MASK(pScan->node.optimizedFlag, OPTIMIZE_FLAG_CPD); + pCxt->optimized = true; + } else { nodesDestroyNode(pPrimaryKeyCond); nodesDestroyNode(pOtherCond); } @@ -367,7 +375,7 @@ static int32_t cpdOptimizeScanCondition(SOptimizeContext* pCxt, SScanLogicNode* return code; } -static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) { +static bool cpdBelongThisTable(SNode* pCondCol, SNodeList* pTableCols) { SNode* pTableCol = NULL; FOREACH(pTableCol, pTableCols) { if (nodesEqualNode(pCondCol, pTableCol)) { @@ -380,9 +388,9 @@ static bool belongThisTable(SNode* pCondCol, SNodeList* pTableCols) { static EDealRes cpdIsMultiTableCondImpl(SNode* pNode, void* pContext) { SCpdIsMultiTableCondCxt* pCxt = pContext; if (QUERY_NODE_COLUMN == nodeType(pNode)) { - if (belongThisTable(pNode, pCxt->pLeftCols)) { + if (cpdBelongThisTable(pNode, pCxt->pLeftCols)) { pCxt->havaLeftCol = true; - } else if (belongThisTable(pNode, pCxt->pRightCols)) { + } else if (cpdBelongThisTable(pNode, pCxt->pRightCols)) { pCxt->haveRightCol = true; } return pCxt->havaLeftCol && pCxt->haveRightCol ? DEAL_RES_END : DEAL_RES_CONTINUE; @@ -508,11 +516,76 @@ static int32_t cpdPushCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild, SN return TSDB_CODE_PLAN_INTERNAL_ERROR; } +static bool cpdIsPrimaryKey(SNode* pNode, SNodeList* pTableCols) { + if (QUERY_NODE_COLUMN != nodeType(pNode)) { + return false; + } + SColumnNode* pCol = (SColumnNode*)pNode; + if (PRIMARYKEY_TIMESTAMP_COL_ID != pCol->colId) { + return false; + } + return cpdBelongThisTable(pNode, pTableCols); +} + +static bool cpdIsPrimaryKeyEqualCond(SJoinLogicNode* pJoin, SNode* pCond) { + if (QUERY_NODE_OPERATOR != nodeType(pCond)) { + return false; + } + SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; + SNodeList* pRightCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1))->pTargets; + SOperatorNode* pOper = (SOperatorNode*)pJoin->pOnConditions; + if (cpdIsPrimaryKey(pOper->pLeft, pLeftCols)) { + return cpdIsPrimaryKey(pOper->pRight, pRightCols); + } else if (cpdIsPrimaryKey(pOper->pLeft, pRightCols)) { + return cpdIsPrimaryKey(pOper->pRight, pLeftCols); + } + return false; +} + +static int32_t cpdCheckOpCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SNode* pOnCond) { + if (!cpdIsPrimaryKeyEqualCond(pJoin, pOnCond)) { + snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression"); + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} + +static int32_t cpdCheckLogicCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin, SLogicConditionNode* pOnCond) { + if (LOGIC_COND_TYPE_AND != pOnCond->condType) { + snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression"); + return TSDB_CODE_FAILED; + } + SNode* pCond = NULL; + FOREACH(pCond, pOnCond->pParameterList) { + if (!cpdIsPrimaryKeyEqualCond(pJoin, pCond)) { + snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "l.ts = r.ts is expected in join expression"); + return TSDB_CODE_FAILED; + } + } + return TSDB_CODE_SUCCESS; +} + +static int32_t cpdCheckJoinOnCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { + if (NULL == pJoin->pOnConditions) { + snprintf(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, "not support cross join"); + return TSDB_CODE_FAILED; + } + if (QUERY_NODE_LOGIC_CONDITION == nodeType(pJoin->pOnConditions)) { + return cpdCheckLogicCond(pCxt, pJoin, (SLogicConditionNode*)pJoin->pOnConditions); + } else { + return cpdCheckOpCond(pCxt, pJoin, pJoin->pOnConditions); + } +} + static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) { - if (NULL == pJoin->node.pConditions) { + if (OPTIMIZE_FLAG_TEST_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD)) { return TSDB_CODE_SUCCESS; } + if (NULL == pJoin->node.pConditions) { + return cpdCheckJoinOnCond(pCxt, pJoin); + } + SNode* pOnCond = NULL; SNode* pLeftChildCond = NULL; SNode* pRightChildCond = NULL; @@ -527,7 +600,11 @@ static int32_t cpdPushJoinCondition(SOptimizeContext* pCxt, SJoinLogicNode* pJoi code = cpdPushCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1), &pRightChildCond); } - if (TSDB_CODE_SUCCESS != code) { + if (TSDB_CODE_SUCCESS == code) { + OPTIMIZE_FLAG_SET_MASK(pJoin->node.optimizedFlag, OPTIMIZE_FLAG_CPD); + pCxt->optimized = true; + code = cpdCheckJoinOnCond(pCxt, pJoin); + } else { nodesDestroyNode(pOnCond); nodesDestroyNode(pLeftChildCond); nodesDestroyNode(pRightChildCond); @@ -572,15 +649,129 @@ static int32_t cpdOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { return cpdPushCondition(pCxt, pLogicNode); } +static bool opkIsPrimaryKeyOrderBy(SNodeList* pSortKeys) { + if (1 != LIST_LENGTH(pSortKeys)) { + return false; + } + SNode* pNode = ((SOrderByExprNode*)nodesListGetNode(pSortKeys, 0))->pExpr; + return (QUERY_NODE_COLUMN == nodeType(pNode) ? (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pNode)->colId) : false); +} + +static bool opkSortMayBeOptimized(SLogicNode* pNode) { + if (QUERY_NODE_LOGIC_PLAN_SORT != nodeType(pNode)) { + return false; + } + if (OPTIMIZE_FLAG_TEST_MASK(pNode->optimizedFlag, OPTIMIZE_FLAG_OPK)) { + return false; + } + return true; +} + +static int32_t opkGetScanNodesImpl(SLogicNode* pNode, bool* pNotOptimize, SNodeList** pScanNodes) { + int32_t code = TSDB_CODE_SUCCESS; + + switch (nodeType(pNode)) { + case QUERY_NODE_LOGIC_PLAN_SCAN: + return nodesListMakeAppend(pScanNodes, pNode); + case QUERY_NODE_LOGIC_PLAN_JOIN: + code = opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes); + if (TSDB_CODE_SUCCESS == code) { + code = opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 1), pNotOptimize, pScanNodes); + } + return code; + case QUERY_NODE_LOGIC_PLAN_AGG: + *pNotOptimize = true; + return code; + default: + break; + } + + if (1 != LIST_LENGTH(pNode->pChildren)) { + *pNotOptimize = true; + } + + return opkGetScanNodesImpl(nodesListGetNode(pNode->pChildren, 0), pNotOptimize, pScanNodes); +} + +static int32_t opkGetScanNodes(SLogicNode* pNode, SNodeList** pScanNodes) { + bool notOptimize = false; + int32_t code = opkGetScanNodesImpl(pNode, ¬Optimize, pScanNodes); + if (TSDB_CODE_SUCCESS != code || notOptimize) { + nodesClearList(*pScanNodes); + } + return code; +} + +static EOrder opkGetPrimaryKeyOrder(SSortLogicNode* pSort) { + return ((SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0))->order; +} + +static SNode* opkRewriteDownNode(SSortLogicNode* pSort) { + SNode* pDownNode = nodesListGetNode(pSort->node.pChildren, 0); + // todo + pSort->node.pChildren = NULL; + return pDownNode; +} + +static int32_t opkDoOptimized(SOptimizeContext* pCxt, SSortLogicNode* pSort, SNodeList* pScanNodes) { + EOrder order = opkGetPrimaryKeyOrder(pSort); + if (ORDER_DESC == order) { + SNode* pScan = NULL; + FOREACH(pScan, pScanNodes) { + ((SScanLogicNode*)pScan)->scanSeq[0] = 0; + ((SScanLogicNode*)pScan)->scanSeq[1] = 1; + } + } + + if (NULL == pSort->node.pParent) { + // todo + return TSDB_CODE_SUCCESS; + } + + SNode* pDownNode = opkRewriteDownNode(pSort); + SNode* pNode; + FOREACH(pNode, pSort->node.pParent->pChildren) { + if (nodesEqualNode(pNode, pSort)) { + REPLACE_NODE(pDownNode); + break; + } + } + nodesDestroyNode(pSort); + return TSDB_CODE_SUCCESS; +} + +static int32_t opkOptimizeImpl(SOptimizeContext* pCxt, SSortLogicNode* pSort) { + OPTIMIZE_FLAG_SET_MASK(pSort->node.optimizedFlag, OPTIMIZE_FLAG_OPK); + if (!opkIsPrimaryKeyOrderBy(pSort->pSortKeys) || 1 != LIST_LENGTH(pSort->node.pChildren)) { + return TSDB_CODE_SUCCESS; + } + SNodeList* pScanNodes = NULL; + int32_t code = opkGetScanNodes(nodesListGetNode(pSort->node.pChildren, 0), &pScanNodes); + if (TSDB_CODE_SUCCESS == code && NULL != pScanNodes) { + code = opkDoOptimized(pCxt, pSort, pScanNodes); + } + nodesClearList(pScanNodes); + return code; +} + +static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) { + SSortLogicNode* pSort = (SSortLogicNode*)optFindPossibleNode(pLogicNode, opkSortMayBeOptimized); + if (NULL == pSort) { + return TSDB_CODE_SUCCESS; + } + return opkOptimizeImpl(pCxt, pSort); +} + static const SOptimizeRule optimizeRuleSet[] = { { .pName = "OptimizeScanData", .optimizeFunc = osdOptimize }, - { .pName = "ConditionPushDown", .optimizeFunc = cpdOptimize } + { .pName = "ConditionPushDown", .optimizeFunc = cpdOptimize }, + { .pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize } }; static const int32_t optimizeRuleNum = (sizeof(optimizeRuleSet) / sizeof(SOptimizeRule)); -static int32_t applyOptimizeRule(SLogicNode* pLogicNode) { - SOptimizeContext cxt = { .optimized = false }; +static int32_t applyOptimizeRule(SPlanContext* pCxt, SLogicNode* pLogicNode) { + SOptimizeContext cxt = { .pPlanCxt = pCxt, .optimized = false }; do { cxt.optimized = false; for (int32_t i = 0; i < optimizeRuleNum; ++i) { @@ -594,5 +785,5 @@ static int32_t applyOptimizeRule(SLogicNode* pLogicNode) { } int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicNode* pLogicNode) { - return applyOptimizeRule(pLogicNode); + return applyOptimizeRule(pCxt, pLogicNode); } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index d5e12608acc395ba4deaee9327098eee658e93ed..28635ef940dfcfae6d3f7f2b2f4174740c42b64c 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -398,9 +398,6 @@ static int32_t createScanPhysiNodeFinalize(SPhysiPlanContext* pCxt, SScanLogicNo if (TSDB_CODE_SUCCESS == code) { pScanPhysiNode->uid = pScanLogicNode->pMeta->uid; pScanPhysiNode->tableType = pScanLogicNode->pMeta->tableType; - pScanPhysiNode->order = TSDB_ORDER_ASC; - pScanPhysiNode->count = 1; - pScanPhysiNode->reverse = 0; memcpy(&pScanPhysiNode->tableName, &pScanLogicNode->tableName, sizeof(SName)); } @@ -432,7 +429,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp return TSDB_CODE_OUT_OF_MEMORY; } - pTableScan->scanFlag = pScanLogicNode->scanFlag; + memcpy(pTableScan->scanSeq, pScanLogicNode->scanSeq, sizeof(pScanLogicNode->scanSeq)); pTableScan->scanRange = pScanLogicNode->scanRange; pTableScan->ratio = pScanLogicNode->ratio; vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode); diff --git a/source/libs/planner/test/planOptTest.cpp b/source/libs/planner/test/planOptTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..4a682a5e17ff8443ab43d020fbe77fa6e18182e7 --- /dev/null +++ b/source/libs/planner/test/planOptTest.cpp @@ -0,0 +1,32 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "planTestUtil.h" +#include "planner.h" + +using namespace std; + +class PlanOptimizeTest : public PlannerTestBase { + +}; + +TEST_F(PlanOptimizeTest, orderByPrimaryKey) { + useDb("root", "test"); + + run("select * from t1 order by ts"); + run("select * from t1 order by ts desc"); + run("select c1 from t1 order by ts"); + run("select c1 from t1 order by ts desc"); +} diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index cca7f1cbffd2f967b491f848c0f41a48d97bf3c3..156e71a845a3a84679dea73607fc9d97f140148b 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1247,26 +1247,28 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } int32_t nowFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - if (inputNum != 1) { - return TSDB_CODE_FAILED; + int64_t ts = taosGetTimestamp(TSDB_TIME_PRECISION_MILLI); + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + colDataAppendInt64(pOutput->columnData, i, &ts); } - colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0)); + pOutput->numOfRows = pInput->numOfRows; return TSDB_CODE_SUCCESS; } int32_t todayFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - if (inputNum != 1) { - return TSDB_CODE_FAILED; + int64_t ts = taosGetTimestampToday(TSDB_TIME_PRECISION_MILLI); + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + colDataAppendInt64(pOutput->columnData, i, &ts); } - colDataAppendInt64(pOutput->columnData, pOutput->numOfRows, (int64_t *)colDataGetData(pInput->columnData, 0)); + pOutput->numOfRows = pInput->numOfRows; return TSDB_CODE_SUCCESS; } int32_t timezoneFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) { - if (inputNum != 1) { - return TSDB_CODE_FAILED; + for (int32_t i = 0; i < pInput->numOfRows; ++i) { + colDataAppend(pOutput->columnData, i, tsTimezoneStr, false); } - colDataAppend(pOutput->columnData, pOutput->numOfRows, (char *)colDataGetData(pInput->columnData, 0), false); + pOutput->numOfRows = pInput->numOfRows; return TSDB_CODE_SUCCESS; } diff --git a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim index d04338aadd9dfa0e197a0af562490322667c2078..878e3d9031f10f45b5623195e3e7c562aec88103 100644 --- a/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim +++ b/tests/script/tsim/tmq/basic2Of2ConsOverlap.sim @@ -292,7 +292,7 @@ print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $ print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6] if $rows != 2 then sleep 1000 - goto wait_consumer_end_from_ctb + goto wait_consumer_end_from_ntb endi if $data[0][1] == 0 then if $data[1][1] != 1 then diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index cf627fb5fb211e6319f82b2f21210846260be509..774c9574f769fd67e4009ac9d49bf26beeb1585a 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -125,13 +125,13 @@ void saveConfigToLogFile() { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); taosFprintfFile(g_fp, " Topics: "); - for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) { - taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[i]); + for (int j = 0 ; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { + taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); } taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, " Key: "); - for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) { - taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]); + for (int k = 0 ; k < g_stConfInfo.stThreads[i].numOfKey; k++) { + taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]); } taosFprintfFile(g_fp, "\n"); }