diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx index aa1186fa52058aa88412546bfa252e43f1a3fcf9..461bdfbf162e696b430c1edb9b09ada70e086fb9 100644 --- a/docs/en/14-reference/03-connector/07-python.mdx +++ b/docs/en/14-reference/03-connector/07-python.mdx @@ -24,6 +24,36 @@ The source code for the Python connector is hosted on [GitHub](https://github.co We recommend using the latest version of `taospy`, regardless of the version of TDengine. +## Handling Exceptions + +There are 4 types of exception in python connector. + +- The exception of Python Connector itself. +- The exception of native library. +- The exception of websocket +- The exception of subscription. +- The exception of other TDengine function modules. + +|Error Type|Description|Suggested Actions| +|:--------:|:---------:|:---------------:| +|InterfaceError|the native library is too old that it cannot support the function|please check the TDengine client version| +|ConnectionError|connection error|please check TDengine's status and the connection params| +|DatabaseError|database error|please upgrade Python connector to latest| +|OperationalError|operation error|| +|ProgrammingError||| +|StatementError|the exception of stmt|| +|ResultError||| +|SchemalessError|the exception of stmt schemaless|| +|TmqError|the exception of stmt tmq|| + +It usually uses try-expect to handle exceptions in python. For exception handling, please refer to [Python Errors and Exceptions Documentation](https://docs.python.org/3/tutorial/errors.html). + +All exceptions from the Python Connector are thrown directly. Applications should handle these exceptions. For example: + +```python +{{#include docs/examples/python/handle_exception.py}} +``` + ## Supported features - Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing. @@ -343,6 +373,8 @@ For a more detailed description of the `sql()` method, please refer to [RestClie +The `Connection` class contains both an implementation of the PEP249 Connection interface (e.g., the `cursor()` method and the `close()` method) and many extensions (e.g., the `execute()`, `query()`, `schemaless_insert()`, and `subscribe()` methods). + ```python {{#include docs/examples/python/connect_websocket_examples.py:basic}} ``` @@ -353,6 +385,46 @@ For a more detailed description of the `sql()` method, please refer to [RestClie +### Querying Data + + + + +The `query` method of the `TaosConnection` class can be used to query data and return the result data of type `TaosResult`. + +```python +{{#include docs/examples/python/connection_usage_native_reference.py:query}} +``` + +:::tip +The queried results can only be fetched once. For example, only one of `fetch_all()` and `fetch_all_into_dict()` can be used in the example above. Repeated fetches will result in an empty list. +::: + + + + + +The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-api). It contains only a `sql()` method for executing arbitrary SQL statements and returning the result. + +```python +{{#include docs/examples/python/rest_client_example.py}} +``` + +For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html). + + + + + +The `query` method of the `TaosConnection` class can be used to query data and return the result data of type `TaosResult`. + +```python +{{#include docs/examples/python/connect_websocket_examples.py:basic}} +``` + + + + ### Usage with req_id By using the optional req_id parameter, you can specify a request ID that can be used for tracing. @@ -811,14 +883,6 @@ bind multiple rows at once | ## Other notes -### Exception handling - -All errors from database operations are thrown directly as exceptions and the error message from the database is passed up the exception stack. The application is responsible for exception handling. For example: - -```python -{{#include docs/examples/python/handle_exception.py}} -``` - ### About nanoseconds Due to the current imperfection of Python's nanosecond support (see link below), the current implementation returns integers at nanosecond precision instead of the `datetime` type produced by `ms` and `us`, which application developers will need to handle on their own. And it is recommended to use pandas' to_datetime(). The Python Connector may modify the interface in the future if Python officially supports nanoseconds in full. diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx index f4f1aad63b846aff51fbbba504e5c0e24479315f..8752dc214565c7834cdc6903f5247cd4c64194a2 100644 --- a/docs/zh/08-connector/30-python.mdx +++ b/docs/zh/08-connector/30-python.mdx @@ -25,6 +25,36 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con 无论使用什么版本的 TDengine 都建议使用最新版本的 `taospy`。 +## 处理异常 + +Python 连接器可能会产生 4 种异常: + +- Python 连接器本身的异常 +- 原生连接方式的异常 +- websocket 连接方式异常 +- 数据订阅异常 +- TDengine 其他功能模块的异常 + +|Error Type|Description|Suggested Actions| +|:--------:|:---------:|:---------------:| +|InterfaceError|taosc 版本太低,不支持所使用的接口|请检查 TDengine 客户端版本| +|ConnectionError|数据库链接错误|请检查 TDengine 服务端状态和连接参数| +|DatabaseError|数据库错误|请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版| +|OperationalError|操作错误|API 使用错误,请检查代码| +|ProgrammingError||| +|StatementError|stmt 相关异常|| +|ResultError||| +|SchemalessError|schemaless 相关异常|| +|TmqError|tmq 相关异常|| + +Python 中通常通过 try-expect 处理异常,异常处理相关请参考 [Python 错误和异常文档](https://docs.python.org/3/tutorial/errors.html)。 + +Python Connector 的所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如: + +```python +{{#include docs/examples/python/handle_exception.py}} +``` + ## 支持的功能 - 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。 @@ -32,7 +62,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con ## 安装 -### 准备 +### 安装前准备 1. 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。 2. 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。 @@ -274,7 +304,7 @@ Transfer-Encoding: chunked -## 示例程序 +## 使用示例 ### 基本使用 @@ -343,6 +373,10 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 +#### Connection 类的使用 + +`Connection` 类既包含对 PEP249 Connection 接口的实现(如:cursor方法和 close 方法),也包含很多扩展功能(如: execute、 query、schemaless_insert 和 subscribe 方法。 + ```python {{#include docs/examples/python/connect_websocket_examples.py:basic}} ``` @@ -353,6 +387,46 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线 +### 查询数据 + + + + +`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。 + +```python +{{#include docs/examples/python/connection_usage_native_reference.py:query}} +``` + +:::tip +查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。 +::: + + + + + +RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。 + +```python +{{#include docs/examples/python/rest_client_example.py}} +``` + +对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。 + + + + + +`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。 + +```python +{{#include docs/examples/python/connect_websocket_examples.py:basic}} +``` + + + + ### 与 req_id 一起使用 使用可选的 req_id 参数,指定请求 id,可以用于 tracing @@ -807,7 +881,7 @@ stmt.close() -### 其它示例程序 +### 更多示例程序 | 示例程序链接 | 示例程序内容 | | ------------------------------------------------------------------------------------------------------------- | ----------------------- | @@ -819,14 +893,6 @@ stmt.close() ## 其它说明 -### 异常处理 - -所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如: - -```python -{{#include docs/examples/python/handle_exception.py}} -``` -`` ### 关于纳秒 (nanosecond) 由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。 diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 6cb7d8852310082e0f1431c265c01752f5d527b7..cd8e0642cf2dcae26ae288e421083f8accae54d4 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -187,6 +187,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); +int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx); void colDataTrim(SColumnInfoData* pColumnInfoData); size_t blockDataGetNumOfCols(const SSDataBlock* pBlock); diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 8aa17e46d1b832c170d808d4124ce57dcb25b4c4..d53f78b41e25f127c7bb46e7d5218ecf0f2bc6df 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -184,6 +184,7 @@ extern int64_t tsStreamBufferSize; extern int64_t tsCheckpointInterval; extern bool tsFilterScalarMode; extern int32_t tsMaxStreamBackendCache; +extern int32_t tsPQSortMemThreshold; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 453c5d49142934b79758f3faf5e3c12c80c16a55..c1481da80cae306eceebce55c80e44ddadabca88 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -246,6 +246,7 @@ typedef struct SSortLogicNode { SLogicNode node; SNodeList* pSortKeys; bool groupSort; + int64_t maxRows; } SSortLogicNode; typedef struct SPartitionLogicNode { @@ -523,6 +524,7 @@ typedef struct SSortPhysiNode { SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode SNodeList* pTargets; + int64_t maxRows; } SSortPhysiNode; typedef SSortPhysiNode SGroupSortPhysiNode; diff --git a/include/util/theap.h b/include/util/theap.h index fb5ff8301a9b08b2cb53c353a363042c2de9cc34..8ddeeb28a43c107a0986d76beecebe0f0aa894d0 100644 --- a/include/util/theap.h +++ b/include/util/theap.h @@ -17,6 +17,7 @@ #define _TD_UTIL_HEAP_H_ #include "os.h" +#include "tarray.h" #ifdef __cplusplus extern "C" { @@ -58,6 +59,48 @@ void heapDequeue(Heap* heap); size_t heapSize(Heap* heap); +typedef bool (*pq_comp_fn)(void* l, void* r, void* param); + +typedef struct PriorityQueueNode { + void* data; +} PriorityQueueNode; + +typedef struct PriorityQueue PriorityQueue; + +PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param); + +void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn); + +void destroyPriorityQueue(PriorityQueue* pq); + +PriorityQueueNode* taosPQTop(PriorityQueue* pq); + +size_t taosPQSize(PriorityQueue* pq); + +void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node); + +void taosPQPop(PriorityQueue* pq); + +typedef struct BoundedQueue BoundedQueue; + +BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete deleteFn, void* param); + +void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn); + +void destroyBoundedQueue(BoundedQueue* q); + +void taosBQPush(BoundedQueue* q, PriorityQueueNode* n); + +PriorityQueueNode* taosBQTop(BoundedQueue* q); + +size_t taosBQSize(BoundedQueue* q); + +size_t taosBQMaxSize(BoundedQueue* q); + +void taosBQBuildHeap(BoundedQueue* q); + +void taosBQPop(BoundedQueue* q); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5d1288d831c5c68ef52140d062111347684213e1..96889882b65f75ccb7f7de5095d03286c1a2609d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -47,6 +47,17 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo } } + +int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) { + if (colDataIsNull_s(pColumnInfoData, rowIdx)) return 0; + + if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes; + if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) + return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx)); + else + return varDataTLen(colDataGetData(pColumnInfoData, rowIdx)); +} + int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 948174b565f133ee160f1926c9c674f53539e0ae..5f6ec92d50bd0d33dcee43abee82ce25d56f887e 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -62,6 +62,7 @@ int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; int32_t tsMaxStreamBackendCache = 128; // M +int32_t tsPQSortMemThreshold = 16; // M // sync raft int32_t tsElectInterval = 25 * 1000; @@ -533,6 +534,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, 0) != 0) return -1; GRANT_CFG_ADD; return 0; @@ -914,6 +916,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval; tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32; + tsPQSortMemThreshold = cfgGetItem(pCfg, "pqSortMemThreshold")->i32; GRANT_CFG_GET; return 0; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 30911c6061a8634e55c1bc38468bc066ced19f72..33c9d845b9bded28acf0c3ec796266bc5476371d 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -75,10 +75,11 @@ typedef struct SResultRowInfo { } SResultRowInfo; typedef struct SColMatchItem { - int32_t colId; - int32_t srcSlotId; - int32_t dstSlotId; - bool needOutput; + int32_t colId; + int32_t srcSlotId; + int32_t dstSlotId; + bool needOutput; + SDataType dataType; } SColMatchItem; typedef struct SColMatchInfo { diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 78c56c0405f6010efc370be8088a367ac12e0f42..7a0d236a3778faed5cab419ce565b9fff7162312 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -64,10 +64,14 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* /** * * @param type + * @param maxRows keep maxRows at most + * @param maxTupleLength max len of one tuple, for check if heap sort is applicable + * @param sortBufSize sort memory buf size, for check if heap sort is applicable * @return */ SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, - SSDataBlock* pBlock, const char* idstr); + SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, + uint32_t sortBufSize); /** * diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 09280295571ac711d8b4cfaaac587e25c2d0733e..cfea233a1ca49e47d8eb26b752f380e59b1dd7c1 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1305,6 +1305,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod c.colId = pColNode->colId; c.srcSlotId = pColNode->slotId; c.dstSlotId = pNode->slotId; + c.dataType = pColNode->node.resType; taosArrayPush(pList, &c); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 008a3697fcc508810af0c30ece6f0c61c408c4ba..a65f5a27ab1581cb95b711040d17e9961ac43978 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2801,7 +2801,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->pSortInputBlock, pTaskInfo->id.str); + pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 585c2e8c541461347475d302cd7a305e18cea336..20fb588a026d7c287fa005188faa854d3a6ebb1f 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -29,6 +29,8 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. SLimitInfo limitInfo; + uint64_t maxTupleLength; + int64_t maxRows; } SSortOperatorInfo; static SSDataBlock* doSort(SOperatorInfo* pOperator); @@ -36,6 +38,7 @@ static int32_t doOpenSortOperator(SOperatorInfo* pOperator); static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); static void destroySortOperatorInfo(void* param); +static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys); // todo add limit/offset impl SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { @@ -51,6 +54,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* int32_t numOfCols = 0; pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); pOperator->exprSupp.numOfExprs = numOfCols; + calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys); + pInfo->maxRows = pSortNode->maxRows; int32_t numOfOutputCols = 0; int32_t code = @@ -193,9 +198,9 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { } pInfo->startTs = taosGetTimestampUs(); - // pInfo->binfo.pRes is not equalled to the input datablock. - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, + pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); @@ -286,6 +291,20 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* return TSDB_CODE_SUCCESS; } +static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) { + SColMatchInfo* pColItem = &pSortOperInfo->matchInfo; + size_t size = taosArrayGetSize(pColItem->pList); + for (size_t i = 0; i < size; ++i) { + pSortOperInfo->maxTupleLength += ((SColMatchItem*)taosArrayGet(pColItem->pList, i))->dataType.bytes; + } + size = LIST_LENGTH(pSortKeys); + for (size_t i = 0; i < size; ++i) { + SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i); + pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes; + } + return TSDB_CODE_SUCCESS; +} + //===================================================================================== // Group Sort Operator typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus; @@ -384,7 +403,7 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) { // pInfo->binfo.pRes is not equalled to the input datablock. pInfo->pCurrSortHandle = - tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); + tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0, 0); tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator); @@ -582,7 +601,7 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->pInputBlock, pTaskInfo->id.str); + pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 58b3428b5bae2a88cac2ae51e7cfc9f3f7590948..daf06c81d13d0d85aa6d47f3147d822a0e915311 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -19,6 +19,7 @@ #include "tcompare.h" #include "tdatablock.h" #include "tdef.h" +#include "theap.h" #include "tlosertree.h" #include "tpagedbuf.h" #include "tsort.h" @@ -41,6 +42,12 @@ struct SSortHandle { int64_t startTs; uint64_t totalElapsed; + uint64_t maxRows; + uint32_t maxTupleLength; + uint32_t sortBufSize; + BoundedQueue* pBoundedQueue; + uint32_t tmpRowIdx; + int32_t sourceId; SSDataBlock* pDataBlock; SMsortComparParam cmpParam; @@ -61,6 +68,47 @@ struct SSortHandle { static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param); +// | offset[0] | offset[1] |....| nullbitmap | data |...| +static void* createTuple(uint32_t columnNum, uint32_t tupleLen) { + uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen; + return taosMemoryCalloc(1, totalLen); +} +static void destoryTuple(void* t) { taosMemoryFree(t); } + +#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx)) +#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset) +#define tupleSetNull(tuple, colIdx, colNum) colDataSetNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx) +#define tupleColIsNull(tuple, colIdx, colNum) colDataIsNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx) +#define tupleGetDataStartOffset(colNum) (sizeof(uint32_t) * colNum + BitmapLen(colNum)) +#define tupleSetData(tuple, offset, data, length) memcpy(tuple + offset, data, length) + +/** + * @param t the tuple pointer addr, if realloced, *t is changed to the new addr + * @param offset copy data into pTuple start from offset + * @param colIndex the columnIndex, for setting null bitmap + * @return the next offset to add field + * */ +static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, size_t length, + bool isNull, uint32_t tupleLen) { + tupleSetOffset(*t, colIdx, offset); + if (isNull) { + tupleSetNull(*t, colIdx, colNum); + } else { + if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) { + *t = taosMemoryRealloc(*t, offset + length); + } + tupleSetData(*t, offset, data, length); + } + return offset + length; +} + +static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { + if (tupleColIsNull(t, colIdx, colNum)) return NULL; + return t + *tupleOffset(t, colIdx); +} + +static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param); + SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) { return createOneDataBlock(pSortHandle->pDataBlock, false); } @@ -71,7 +119,8 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) { * @return */ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, - SSDataBlock* pBlock, const char* idstr) { + SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, + uint32_t sortBufSize) { SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); pSortHandle->type = type; @@ -80,6 +129,13 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->pSortInfo = pSortInfo; pSortHandle->loops = 0; + pSortHandle->maxTupleLength = maxTupleLength; + if (maxRows < 0) + pSortHandle->sortBufSize = 0; + else + pSortHandle->sortBufSize = sortBufSize; + pSortHandle->maxRows = maxRows; + if (pBlock != NULL) { pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); } @@ -150,7 +206,6 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { if (pSortHandle == NULL) { return; } - tsortClose(pSortHandle); if (pSortHandle->pMergeTree != NULL) { tMergeTreeDestroy(&pSortHandle->pMergeTree); @@ -159,6 +214,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { destroyDiskbasedBuf(pSortHandle->pBuf); taosMemoryFreeClear(pSortHandle->idStr); blockDataDestroy(pSortHandle->pDataBlock); + if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue); int64_t fetchUs = 0, fetchNum = 0; tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum); @@ -769,17 +825,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { return code; } -int32_t tsortOpen(SSortHandle* pHandle) { - if (pHandle->opened) { - return 0; - } - - if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { - return -1; - } - - pHandle->opened = true; - +static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { int32_t code = createInitialSources(pHandle); if (code != TSDB_CODE_SUCCESS) { return code; @@ -840,7 +886,7 @@ int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) { return TSDB_CODE_SUCCESS; } -STupleHandle* tsortNextTuple(SSortHandle* pHandle) { +static STupleHandle* tsortBufMergeSortNextTuple(SSortHandle* pHandle) { if (tsortIsClosed(pHandle)) { return NULL; } @@ -890,6 +936,168 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) { return &pHandle->tupleHandle; } +static bool tsortIsPQSortApplicable(SSortHandle* pHandle) { + if (pHandle->type != SORT_SINGLESOURCE_SORT) return false; + uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*)); + return maxRowsFitInMemory > pHandle->maxRows; +} + +static bool tsortPQCompFn(void* a, void* b, void* param) { + SSortHandle* pHandle = param; + int32_t res = pHandle->comparFn(a, b, param); + if (res < 0) return 1; + return 0; +} + +static bool tsortPQComFnReverse(void*a, void* b, void* param) { + SSortHandle* pHandle = param; + int32_t res = pHandle->comparFn(a, b, param); + if (res > 0) return 1; + return 0; +} + +static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param) { + char* pLTuple = (char*)pLeft; + char* pRTuple = (char*)pRight; + SSortHandle* pHandle = (SSortHandle*)param; + SArray* orderInfo = (SArray*)pHandle->pSortInfo; + uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock); + for (int32_t i = 0; i < orderInfo->size; ++i) { + SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i); + void *lData = tupleGetField(pLTuple, pOrder->slotId, colNum); + void *rData = tupleGetField(pRTuple, pOrder->slotId, colNum); + if (!lData && !rData) continue; + if (!lData) return pOrder->nullFirst ? -1 : 1; + if (!rData) return pOrder->nullFirst ? 1 : -1; + + int type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type; + __compar_fn_t fn = getKeyComparFunc(type, pOrder->order); + + int ret = fn(lData, rData); + if (ret == 0) { + continue; + } else { + return ret; + } + } + return 0; +} + +static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { + pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destoryTuple, pHandle); + if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY; + tsortSetComparFp(pHandle, colDataComparFn); + + SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); + SSortSource* source = *pSource; + + pHandle->pDataBlock = NULL; + uint32_t tupleLen = 0; + PriorityQueueNode pqNode; + while (1) { + // fetch data + SSDataBlock* pBlock = pHandle->fetchfp(source->param); + if (NULL == pBlock) break; + + if (pHandle->beforeFp != NULL) { + pHandle->beforeFp(pBlock, pHandle->param); + } + if (pHandle->pDataBlock == NULL) { + pHandle->pDataBlock = createOneDataBlock(pBlock, false); + } + if (pHandle->pDataBlock == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + size_t colNum = blockDataGetNumOfCols(pBlock); + + if (tupleLen == 0) { + for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); + tupleLen += pCol->info.bytes; + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + tupleLen += sizeof(VarDataLenT); + } + } + } + size_t colLen = 0; + for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) { + void* pTuple = createTuple(colNum, tupleLen); + if (pTuple == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + uint32_t offset = tupleGetDataStartOffset(colNum); + for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); + if (colDataIsNull_s(pCol, rowIdx)) { + offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen); + } else { + colLen = colDataGetRowLength(pCol, rowIdx); + offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, + tupleLen); + } + } + pqNode.data = pTuple; + taosBQPush(pHandle->pBoundedQueue, &pqNode); + } + } + return TSDB_CODE_SUCCESS; +} + +static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) { + blockDataCleanup(pHandle->pDataBlock); + blockDataEnsureCapacity(pHandle->pDataBlock, 1); + // abondan the top tuple if queue size bigger than max size + if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) { + taosBQPop(pHandle->pBoundedQueue); + } + if (pHandle->tmpRowIdx == 0) { + // sort the results + taosBQSetFn(pHandle->pBoundedQueue, tsortPQComFnReverse); + taosBQBuildHeap(pHandle->pBoundedQueue); + } + if (taosBQSize(pHandle->pBoundedQueue) > 0) { + uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock); + PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue); + char* pTuple = (char*)node->data; + + for (uint32_t i = 0; i < colNum; ++i) { + void* pData = tupleGetField(pTuple, i, colNum); + if (!pData) { + colDataSetNULL(bdGetColumnInfoData(pHandle->pDataBlock, i), 0); + } else { + colDataAppend(bdGetColumnInfoData(pHandle->pDataBlock, i), 0, pData, false); + } + } + pHandle->pDataBlock->info.rows++; + pHandle->tmpRowIdx++; + taosBQPop(pHandle->pBoundedQueue); + } + if (pHandle->pDataBlock->info.rows == 0) return NULL; + pHandle->tupleHandle.pBlock = pHandle->pDataBlock; + return &pHandle->tupleHandle; +} + +int32_t tsortOpen(SSortHandle* pHandle) { + if (pHandle->opened) { + return 0; + } + + if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { + return -1; + } + + pHandle->opened = true; + if (tsortIsPQSortApplicable(pHandle)) + return tsortOpenForPQSort(pHandle); + else + return tsortOpenForBufMergeSort(pHandle); +} + +STupleHandle* tsortNextTuple(SSortHandle* pHandle) { + if (pHandle->pBoundedQueue) + return tsortPQSortNextTuple(pHandle); + else + return tsortBufMergeSortNextTuple(pHandle); +} + bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) { SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex); return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 6e4dde4ec1752f4bb3349c9c41658ab0140264ae..8305daa45e2782ac203c2376b9032c3464b66578 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -502,6 +502,7 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pSortKeys); COPY_SCALAR_FIELD(groupSort); + COPY_SCALAR_FIELD(maxRows); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0b449c5bfe7e7543ba824a4c128f055c7aa011e0..99790e0a93f67401719ae0a5343d77bb365d0cfa 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2100,6 +2100,7 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) { static const char* jkSortPhysiPlanExprs = "Exprs"; static const char* jkSortPhysiPlanSortKeys = "SortKeys"; static const char* jkSortPhysiPlanTargets = "Targets"; +static const char* jkSortPhysiPlanMaxRows = "MaxRows"; static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2114,6 +2115,9 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSortPhysiPlanMaxRows, pNode->maxRows); + } return code; } @@ -2131,6 +2135,9 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkSortPhysiPlanMaxRows, &pNode->maxRows); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 1ca37defa4a76a6b679e85facae10a6cd758fb80..e79a520615142b977716cc837f5e31fc4bbd73c3 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2594,7 +2594,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS }; +enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_MAX_ROWS }; static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2609,6 +2609,9 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_SORT_CODE_MAX_ROWS, pNode->maxRows); + } return code; } @@ -2632,6 +2635,9 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; + case PHY_SORT_CODE_MAX_ROWS: + code = tlvDecodeI64(pTlv, &pNode->maxRows); + break; default: break; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 713f12e2294c49bb1327728a7fa162d6313e31f2..4a8d100db310596a800fea8aa17336da02619496 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1027,6 +1027,7 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return TSDB_CODE_OUT_OF_MEMORY; } + pSort->maxRows = -1; pSort->groupSort = pSelect->groupSort; pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; @@ -1298,6 +1299,7 @@ static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* p return TSDB_CODE_OUT_OF_MEMORY; } + pSort->maxRows = -1; TSWAP(pSort->node.pLimit, pSetOperator->pLimit); int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 2d1a758f3342f5d4fea5a31d8bc578ca4960d94e..82d883714d4a3782807facf432b37f07e5a162fa 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -123,7 +123,7 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order, SLogicNode* pNode pNode->inputTsOrder = order; switch (nodeType(pNode)) { // for those nodes that will change the order, stop propagating - //case QUERY_NODE_LOGIC_PLAN_WINDOW: + // case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_SORT: @@ -769,8 +769,9 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond) } SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft); SColumnNode* pRight = (SColumnNode*)(pOper->pRight); - //TODO: add cast to operator and remove this restriction of optimization - if (pLeft->node.resType.type != pRight->node.resType.type || pLeft->node.resType.bytes != pRight->node.resType.bytes) { + // TODO: add cast to operator and remove this restriction of optimization + if (pLeft->node.resType.type != pRight->node.resType.type || + pLeft->node.resType.bytes != pRight->node.resType.bytes) { return false; } SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; @@ -2575,7 +2576,7 @@ static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) { SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode); if (NULL != pNode) { - //TODO: only set the slimit now. push down slimit later + // TODO: only set the slimit now. push down slimit later pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit); ((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset; ((SLimitNode*)pTableScanNode->pSlimit)->offset = 0; @@ -2629,8 +2630,16 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp } static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) { - if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren) || - QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) { + if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren)) { + return false; + } + + SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); + if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { + SLimitNode* pChildLimit = (SLimitNode*)(pChild->pLimit); + // if we have pushed down, we skip it + if ((*(SSortLogicNode*)pChild).maxRows != -1) return false; + } else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) { return false; } return true; @@ -2644,8 +2653,18 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); nodesDestroyNode(pChild->pLimit); - pChild->pLimit = pNode->pLimit; - pNode->pLimit = NULL; + if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { + SLimitNode* pLimitNode = (SLimitNode*)pNode->pLimit; + int64_t maxRows = -1; + if (pLimitNode->limit != -1) { + maxRows = pLimitNode->limit; + if (pLimitNode->offset != -1) maxRows += pLimitNode->offset; + } + ((SSortLogicNode*)pChild)->maxRows = maxRows; + } else { + pChild->pLimit = pNode->pLimit; + pNode->pLimit = NULL; + } pCxt->optimized = true; return TSDB_CODE_SUCCESS; @@ -2898,7 +2917,7 @@ static SSortLogicNode* sortNonPriKeySatisfied(SLogicNode* pNode) { if (sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys)) { return NULL; } - SNode* pSortKeyNode = NULL, *pSortKeyExpr = NULL; + SNode *pSortKeyNode = NULL, *pSortKeyExpr = NULL; FOREACH(pSortKeyNode, pSort->pSortKeys) { pSortKeyExpr = ((SOrderByExprNode*)pSortKeyNode)->pExpr; switch (nodeType(pSortKeyExpr)) { @@ -2931,7 +2950,7 @@ static int32_t sortNonPriKeyOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog optFindEligibleNode(pLogicSubplan->pNode, sortNonPriKeyShouldOptimize, pNodeList); SNode* pNode = NULL; FOREACH(pNode, pNodeList) { - SSortLogicNode* pSort = (SSortLogicNode*)pNode; + SSortLogicNode* pSort = (SSortLogicNode*)pNode; SOrderByExprNode* pOrderByExpr = (SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0); pSort->node.outputTsOrder = pOrderByExpr->order; optSetParentOrder(pSort->node.pParent, pOrderByExpr->order, NULL); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index b3d94a5e47aef06960954b88c612b9568f0e45d4..a349e2c0e9cddb0a7b5f4080649648586748e069 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1374,6 +1374,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren if (NULL == pSort) { return TSDB_CODE_OUT_OF_MEMORY; } + pSort->maxRows = pSortLogicNode->maxRows; SNodeList* pPrecalcExprs = NULL; SNodeList* pSortKeys = NULL; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 246ee13fb00aa7d30857e63a03f18262ffb10510..f352a2bba3089ff5ef7ef89e53a9f0b54d4eff13 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1018,6 +1018,7 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut splSetParent((SLogicNode*)pPartSort); pPartSort->pSortKeys = pSortKeys; pPartSort->groupSort = pSort->groupSort; + pPartSort->maxRows = pSort->maxRows; code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys); } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 794d80bbfaec7c38eca4b38f78c62022fd6cc7cc..92f34db16d2d84b0edbf284ecb5c0b1c0a5abc60 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -482,6 +482,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) { sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index); + taosMsleep(1); goto _out; } ASSERT(pEntry->index == pBuf->matchIndex); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index cf796c386297271d1138189ea812fe4b1fbe984c..ae1c775a18f6c47291f3065f83db0c2ff8cee94e 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -364,10 +364,10 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64 if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) { pSyncNode->hbSlowNum++; - sNInfo(pSyncNode, - "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 - ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64, - DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff); + sNTrace(pSyncNode, + "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 + ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64, + DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff); } sNTrace(pSyncNode, diff --git a/source/util/src/theap.c b/source/util/src/theap.c index 8c1a1db05709e3ef3eceb3329185d82f0b5485ed..d60606008fbacfb958d8e6ba695464da5e903838 100644 --- a/source/util/src/theap.c +++ b/source/util/src/theap.c @@ -187,3 +187,172 @@ void heapRemove(Heap* heap, HeapNode* node) { } void heapDequeue(Heap* heap) { heapRemove(heap, heap->min); } + + +struct PriorityQueue { + SArray* container; + pq_comp_fn fn; + FDelete deleteFn; + void* param; +}; +PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param) { + PriorityQueue* pq = (PriorityQueue*)taosMemoryCalloc(1, sizeof(PriorityQueue)); + pq->container = taosArrayInit(1, sizeof(PriorityQueueNode)); + pq->fn = fn; + pq->deleteFn = deleteFn; + pq->param = param; + return pq; +} + +void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn) { + pq->fn = fn; +} + +void destroyPriorityQueue(PriorityQueue* pq) { + if (pq->deleteFn) + taosArrayDestroyP(pq->container, pq->deleteFn); + else + taosArrayDestroy(pq->container); + taosMemoryFree(pq); +} + +static size_t pqParent(size_t i) { return (--i) >> 1; /* (i - 1) / 2 */ } +static size_t pqLeft(size_t i) { return (i << 1) | 1; /* i * 2 + 1 */ } +static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */} +static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) { + void * tmp = a->data; + a->data = b->data; + b->data = tmp; +} + +#define pqContainerGetEle(pq, i) ((PriorityQueueNode*)taosArrayGet((pq)->container, (i))) +#define pqContainerSize(pq) (taosArrayGetSize((pq)->container)) + +size_t taosPQSize(PriorityQueue* pq) { return pqContainerSize(pq); } + +static void pqHeapify(PriorityQueue* pq, size_t from, size_t last) { + size_t largest = from; + do { + from = largest; + size_t l = pqLeft(from); + size_t r = pqRight(from); + if (l < last && pq->fn(pqContainerGetEle(pq, from)->data, pqContainerGetEle(pq, l)->data, pq->param)) { + largest = l; + } + if (r < last && pq->fn(pqContainerGetEle(pq, largest)->data, pqContainerGetEle(pq, r)->data, pq->param)) { + largest = r; + } + if (largest != from) { + pqSwapPQNode(pqContainerGetEle(pq, from), pqContainerGetEle(pq, largest)); + } + } while (largest != from); +} + +static void pqBuildHeap(PriorityQueue* pq) { + if (pqContainerSize(pq) > 1) { + for (size_t i = pqContainerSize(pq) - 1; i > 0; --i) { + pqHeapify(pq, i, pqContainerSize(pq)); + } + pqHeapify(pq, 0, pqContainerSize(pq)); + } +} + +static void pqReverseHeapify(PriorityQueue* pq, size_t i) { + while (i > 0 && !pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) { + size_t parentIdx = pqParent(i); + pqSwapPQNode(pqContainerGetEle(pq, i), pqContainerGetEle(pq, parentIdx)); + i = parentIdx; + } +} + +static void pqUpdate(PriorityQueue* pq, size_t i) { + if (i == 0 || pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) { + // if value in pos i is smaller than parent, heapify down from i to the end + pqHeapify(pq, i, pqContainerSize(pq)); + } else { + // if value in pos i is big than parent, heapify up from i + pqReverseHeapify(pq, i); + } +} + +static void pqRemove(PriorityQueue* pq, size_t i) { + if (i == pqContainerSize(pq) - 1) { + taosArrayPop(pq->container); + return; + } + + taosArraySet(pq->container, i, taosArrayGet(pq->container, pqContainerSize(pq) - 1)); + taosArrayPop(pq->container); + pqUpdate(pq, i); +} + +PriorityQueueNode* taosPQTop(PriorityQueue* pq) { + return pqContainerGetEle(pq, 0); +} + +void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) { + taosArrayPush(pq->container, node); + pqReverseHeapify(pq, pqContainerSize(pq) - 1); +} + +void taosPQPop(PriorityQueue* pq) { + PriorityQueueNode* top = taosPQTop(pq); + if (pq->deleteFn) pq->deleteFn(top->data); + pqRemove(pq, 0); +} + +struct BoundedQueue { + PriorityQueue* queue; + uint32_t maxSize; +}; + +BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete deleteFn, void* param) { + BoundedQueue* q = (BoundedQueue*)taosMemoryCalloc(1, sizeof(BoundedQueue)); + q->queue = createPriorityQueue(fn, deleteFn, param); + taosArrayEnsureCap(q->queue->container, maxSize + 1); + q->maxSize = maxSize; + return q; +} + +void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) { + taosPQSetFn(q->queue, fn); +} + +void destroyBoundedQueue(BoundedQueue* q) { + if (!q) return; + destroyPriorityQueue(q->queue); + taosMemoryFree(q); +} + +void taosBQPush(BoundedQueue* q, PriorityQueueNode* n) { + if (pqContainerSize(q->queue) == q->maxSize + 1) { + PriorityQueueNode* top = pqContainerGetEle(q->queue, 0); + void *p = top->data; + top->data = n->data; + n->data = p; + if (q->queue->deleteFn) q->queue->deleteFn(n->data); + pqHeapify(q->queue, 0, taosBQSize(q)); + } else { + taosPQPush(q->queue, n); + } +} + +PriorityQueueNode* taosBQTop(BoundedQueue* q) { + return taosPQTop(q->queue); +} + +void taosBQBuildHeap(BoundedQueue *q) { + pqBuildHeap(q->queue); +} + +size_t taosBQMaxSize(BoundedQueue* q) { + return q->maxSize; +} + +size_t taosBQSize(BoundedQueue* q) { + return taosPQSize(q->queue); +} + +void taosBQPop(BoundedQueue* q) { + taosPQPop(q->queue); +} diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index f561bd4ed7b4cca5cbcd2be0ceb0188f7db307c0..1dff2a90d022ac9bd2e3328c09354d7a6cfe3a2f 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -33,6 +33,8 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb3.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py diff --git a/tests/script/tsim/parser/limit1_stb.sim b/tests/script/tsim/parser/limit1_stb.sim index 731a218de5d11f11b623a8348344597fb84f78b8..027a4f5c797133979a90a352f5d2b58fd67094ff 100644 --- a/tests/script/tsim/parser/limit1_stb.sim +++ b/tests/script/tsim/parser/limit1_stb.sim @@ -468,7 +468,7 @@ if $data01 != 1 then endi ## supertable aggregation + where + interval + group by order by tag + limit offset -sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9),t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 2 offset 0 +sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9),t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 2 offset 0 if $rows != 2 then return -1 endi diff --git a/tests/script/tsim/parser/limit_stb.sim b/tests/script/tsim/parser/limit_stb.sim index 6950df9ee1b41816feca5c8753efd14489fda063..46bd6260c3b8c726e24e8e07ab0d88f94872a5e9 100644 --- a/tests/script/tsim/parser/limit_stb.sim +++ b/tests/script/tsim/parser/limit_stb.sim @@ -508,7 +508,7 @@ endi ### supertable aggregation + where + interval + group by order by tag + limit offset ## TBASE-345 -sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 3 offset 0 +sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 3 offset 0 if $rows != 3 then return -1 endi @@ -554,7 +554,7 @@ if $data09 != 4 then return -1 endi -sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 and c1 > 0 and c2 < 9 and c3 > 4 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 3 offset 0 +sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 and c1 > 0 and c2 < 9 and c3 > 4 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 3 offset 0 if $rows != 3 then return -1 endi diff --git a/tests/script/tsim/parser/union.sim b/tests/script/tsim/parser/union.sim index dee5da96e8488a14cc615d7ab3944ccf6558f93e..f0c534ad11b336cade9d79e2f652742ed2dfbd86 100644 --- a/tests/script/tsim/parser/union.sim +++ b/tests/script/tsim/parser/union.sim @@ -126,7 +126,6 @@ endi if $data10 != 1 then return -1 endi - sql (select 'ab' as options from union_tb1 limit 1) union all (select 'dd' as options from union_tb0 limit 1) order by options; if $rows != 2 then return -1 diff --git a/tests/script/tsim/query/r/explain_tsorder.result b/tests/script/tsim/query/r/explain_tsorder.result index 6c63a343de1b945ce681aaceadedfabb612f2136..b69a77ada52d5f6c819a5edb5a292a027d9f320e 100644 --- a/tests/script/tsim/query/r/explain_tsorder.result +++ b/tests/script/tsim/query/r/explain_tsorder.result @@ -2558,3 +2558,243 @@ taos> select a.ts, a.c2, b.c2 from meters as a join (select * from meters order 2022-05-24 00:01:08.000 | 210 | 210 | 2022-05-24 00:01:08.000 | 210 | 210 | +taos> select ts, c2 from meters order by c2; + ts | c2 | +======================================== + 2022-05-21 00:01:08.000 | 11 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-24 00:01:08.000 | 210 | + 2022-05-24 00:01:08.000 | 210 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-19 00:01:08.000 | 243 | + +taos> select ts, c2 from meters order by c2 limit 4; + ts | c2 | +======================================== + 2022-05-21 00:01:08.000 | 11 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-18 00:01:08.000 | 58 | + +taos> select ts, c2 from meters order by c2 limit 2,2; + ts | c2 | +======================================== + 2022-05-18 00:01:08.000 | 58 | + 2022-05-18 00:01:08.000 | 58 | + +taos> select ts, c2 from meters order by ts asc, c2 desc limit 10; + ts | c2 | +======================================== + 2022-05-15 00:01:08.000 | 234 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-19 00:01:08.000 | 243 | + +taos> select ts, c2 from meters order by ts asc, c2 desc limit 5,5; + ts | c2 | +======================================== + 2022-05-17 00:01:08.000 | 59 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-19 00:01:08.000 | 243 | + +taos> select ts, c2 from d1 order by c2; + ts | c2 | +======================================== + 2022-05-21 00:01:08.000 | 11 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-24 00:01:08.000 | 210 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-19 00:01:08.000 | 243 | + +taos> select ts, c2 from d1 order by c2 limit 4; + ts | c2 | +======================================== + 2022-05-21 00:01:08.000 | 11 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-23 00:01:08.000 | 116 | + +taos> select ts, c2 from d1 order by c2 limit 2,2; + ts | c2 | +======================================== + 2022-05-17 00:01:08.000 | 59 | + 2022-05-23 00:01:08.000 | 116 | + +taos> select ts, c2 from d1 order by ts asc, c2 desc limit 10; + ts | c2 | +======================================== + 2022-05-15 00:01:08.000 | 234 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-24 00:01:08.000 | 210 | + +taos> select ts, c2 from d1 order by ts asc, c2 desc limit 5,5; + ts | c2 | +======================================== + 2022-05-20 00:01:08.000 | 120 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-24 00:01:08.000 | 210 | + +taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc; + _wstart | d | avg(c) | +================================================================================ + 2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 | + 2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 | + 2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 | + 2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 | + 2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 | + 2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 | + 2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 | + 2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 | + 2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 | + 2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 | + 2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 | + 2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 | + 2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 | + 2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 | + 2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 | + 2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 | + 2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 | + 2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 | + 2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 | + 2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 | + 2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 | + 2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 | + 2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 | + 2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 | + 2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 | + 2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 | + 2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 | + 2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 | + 2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 | + +taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2; + _wstart | d | avg(c) | +================================================================================ + 2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 | + 2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 | + +taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6; + _wstart | d | avg(c) | +================================================================================ + 2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 | + 2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 | + 2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 | + 2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 | + 2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 | + 2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 | + +taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10; + last(ts) | d | +======================================== + 2022-05-19 00:01:08.000 | 243 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-24 00:01:08.000 | 210 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-21 00:01:08.000 | 11 | + +taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 2,8; + last(ts) | d | +======================================== + 2022-05-24 00:01:08.000 | 210 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-21 00:01:08.000 | 11 | + +taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 9,1; + last(ts) | d | +======================================== + 2022-05-21 00:01:08.000 | 11 | + +taos> select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 2,8; + last(ts) | d | +======================================== + 2022-05-17 00:01:08.000 | 59 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-24 00:01:08.000 | 210 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-19 00:01:08.000 | 243 | + +taos> select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1; + last(ts) | d | +======================================== + 2022-05-19 00:01:08.000 | 243 | + +taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10; + ts | d | +======================================== + 2022-05-24 00:01:08.000 | 210 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-15 00:01:08.000 | 234 | + +taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8; + ts | d | +======================================== + 2022-05-22 00:01:08.000 | 196 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-15 00:01:08.000 | 234 | + +taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1; + ts | d | +======================================== + 2022-05-15 00:01:08.000 | 234 | + diff --git a/tests/script/tsim/query/t/explain_tsorder.sql b/tests/script/tsim/query/t/explain_tsorder.sql index d3264d8895246a4151a56b28cdc7968b5969a4e0..056ac440fee299677b991d0a996ac47a2e854073 100644 --- a/tests/script/tsim/query/t/explain_tsorder.sql +++ b/tests/script/tsim/query/t/explain_tsorder.sql @@ -71,3 +71,30 @@ select a.ts, a.c2, b.c2 from meters as a join meters as b on a.ts = b.ts order b explain verbose true select a.ts, a.c2, b.c2 from meters as a join (select ts, c2 from meters order by ts desc) b on a.ts = b.ts order by a.ts desc\G; explain verbose true select a.ts, a.c2, b.c2 from meters as a join (select ts, c2 from meters order by ts desc) b on a.ts = b.ts order by a.ts asc\G; select a.ts, a.c2, b.c2 from meters as a join (select * from meters order by ts desc) b on a.ts = b.ts order by a.ts asc; + +select ts, c2 from meters order by c2; +select ts, c2 from meters order by c2 limit 4; +select ts, c2 from meters order by c2 limit 2,2; + +select ts, c2 from meters order by ts asc, c2 desc limit 10; +select ts, c2 from meters order by ts asc, c2 desc limit 5,5; + +select ts, c2 from d1 order by c2; +select ts, c2 from d1 order by c2 limit 4; +select ts, c2 from d1 order by c2 limit 2,2; + +select ts, c2 from d1 order by ts asc, c2 desc limit 10; +select ts, c2 from d1 order by ts asc, c2 desc limit 5,5; + +select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc; +select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2; +select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6; + +select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10; +select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 2,8; +select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 9,1; +select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 2,8; +select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1; +select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10; +select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8; +select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1; diff --git a/tests/system-test/0-others/splitVGroup.py b/tests/system-test/0-others/splitVGroup.py index 32001f34b09566acd5f1faa6d7358dc1ce1ac150..450996106608aa60cec781392bec8f9463ff2171 100644 --- a/tests/system-test/0-others/splitVGroup.py +++ b/tests/system-test/0-others/splitVGroup.py @@ -328,9 +328,28 @@ class TDTestCase: tdLog.exit("split vgroup transaction is not finished after executing 50s") return False + # split error + def expectSplitError(self, dbName): + vgids = self.getVGroup(dbName) + selid = random.choice(vgids) + sql = f"split vgroup {selid}" + tdLog.info(sql) + tdSql.error(sql) + + # expect split ok + def expectSplitOk(self, dbName): + # split vgroup + vgList1 = self.getVGroup(dbName) + self.splitVGroup(dbName) + vgList2 = self.getVGroup(dbName) + vgNum1 = len(vgList1) + 1 + vgNum2 = len(vgList2) + if vgNum1 != vgNum2: + tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}") + return + # split empty database - def splitEmptyDB(self): - + def splitEmptyDB(self): dbName = "emptydb" vgNum = 2 # create database @@ -339,17 +358,33 @@ class TDTestCase: tdSql.execute(sql) # split vgroup - self.splitVGroup(dbName) - vgList = self.getVGroup(dbName) - vgNum1 = len(vgList) - vgNum2 = vgNum + 1 - if vgNum1 != vgNum2: - tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}") - return + self.expectSplitOk(dbName) + + + # forbid + def checkForbid(self): + # stream + tdLog.info("check forbid split having stream...") + tdSql.execute("create database streamdb;") + tdSql.execute("use streamdb;") + tdSql.execute("create table ta(ts timestamp, age int);") + tdSql.execute("create stream ma into sta as select count(*) from ta interval(1s);") + self.expectSplitError("streamdb") + tdSql.execute("drop stream ma;") + self.expectSplitOk("streamdb") + + # topic + tdLog.info("check forbid split having topic...") + tdSql.execute("create database topicdb wal_retention_period 10;") + tdSql.execute("use topicdb;") + tdSql.execute("create table ta(ts timestamp, age int);") + tdSql.execute("create topic toa as select * from ta;") + self.expectSplitError("topicdb") + tdSql.execute("drop topic toa;") + self.expectSplitOk("topicdb") # run def run(self): - # prepare env self.prepareEnv() @@ -360,12 +395,13 @@ class TDTestCase: # check two db query result same self.checkResult() - tdLog.info(f"split vgroup i={i} passed.") # split empty db - self.splitEmptyDB() + self.splitEmptyDB() + # check topic and stream forib + self.checkForbid() # stop def stop(self): diff --git a/tests/system-test/2-query/limit.py b/tests/system-test/2-query/limit.py index c00e3b7d56af46f662ddb855a34e154e7c723b80..4774602d691cbcb516e3400ae14eea0a62d7fe0b 100644 --- a/tests/system-test/2-query/limit.py +++ b/tests/system-test/2-query/limit.py @@ -321,7 +321,7 @@ class TDTestCase: limit = 5 offset = paraDict["rowsPerTbl"] * 2 offset = offset - 2 - sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1 limit %d offset %d"%(limit, offset) + sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1, max(c1) limit %d offset %d"%(limit, offset) # tdLog.info("====sql:%s"%(sqlStr)) tdSql.query(sqlStr) tdSql.checkRows(1) diff --git a/tests/system-test/7-tmq/tmqParamsTest.py b/tests/system-test/7-tmq/tmqParamsTest.py new file mode 100644 index 0000000000000000000000000000000000000000..82000ba02a70569eca57abd30f629feabf290cb0 --- /dev/null +++ b/tests/system-test/7-tmq/tmqParamsTest.py @@ -0,0 +1,178 @@ + +import sys +import time +import threading +from taos.tmq import Consumer +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + self.wal_retention_period1 = 3600 + self.wal_retention_period2 = 1 + self.commit_value_list = ["true", "false"] + self.offset_value_list = ["", "earliest", "latest", "none"] + self.tbname_value_list = ["true", "false"] + self.snapshot_value_list = ["true", "false"] + + # self.commit_value_list = ["true"] + # self.offset_value_list = ["none"] + # self.tbname_value_list = ["true"] + # self.snapshot_value_list = ["true"] + + def tmqParamsTest(self): + paraDict = {'dbName': 'db1', + 'dropFlag': 1, + 'vgroups': 4, + 'stbName': 'stb', + 'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbNum': 1, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'auto_commit_interval': "100"} + + + start_group_id = 1 + for snapshot_value in self.snapshot_value_list: + for commit_value in self.commit_value_list: + for offset_value in self.offset_value_list: + for tbname_value in self.tbname_value_list: + topic_name = 'topic1' + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) + tdLog.info("create stb") + tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) + tdLog.info("create ctb") + tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix']) + tdLog.info("insert data") + tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"]) + + + tdLog.info("create topics from stb with filter") + queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topic_name, queryString) + tdSql.query(f'select * from information_schema.ins_databases') + db_wal_retention_period_list = list(map(lambda x:x[-8] if x[0] == paraDict['dbName'] else None, tdSql.queryResult)) + for i in range(len(db_wal_retention_period_list)): + if db_wal_retention_period_list[0] is None or db_wal_retention_period_list[-1] is None: + db_wal_retention_period_list.remove(None) + if snapshot_value =="true": + if db_wal_retention_period_list[0] != self.wal_retention_period2: + tdSql.execute(f"alter database {paraDict['dbName']} wal_retention_period {self.wal_retention_period2}") + time.sleep(self.wal_retention_period2+1) + tdSql.execute(f'flush database {paraDict["dbName"]}') + else: + if db_wal_retention_period_list[0] != self.wal_retention_period1: + tdSql.execute(f"alter database {paraDict['dbName']} wal_retention_period {self.wal_retention_period1}") + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query(queryString) + expected_res = tdSql.queryRows + group_id = "csm_" + str(start_group_id) + consumer_dict = { + "group.id": group_id, + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.commit.interval.ms": paraDict["auto_commit_interval"], + "enable.auto.commit": commit_value, + "auto.offset.reset": offset_value, + "experimental.snapshot.enable": snapshot_value, + "msg.with.table.name": tbname_value + } + consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0 + consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0 + consumer_ret = "earliest" if offset_value == "" else offset_value + expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}' + if len(offset_value) == 0: + del consumer_dict["auto.offset.reset"] + consumer = Consumer(consumer_dict) + consumer.subscribe([topic_name]) + tdLog.info(f"enable.auto.commit: {commit_value}, auto.offset.reset: {offset_value}, experimental.snapshot.enable: {snapshot_value}, msg.with.table.name: {tbname_value}") + stop_flag = 0 + try: + while True: + res = consumer.poll(1) + tdSql.query('show consumers;') + consumer_info = tdSql.queryResult[0][-1] + if offset_value == "latest": + if not res and stop_flag == 1: + break + else: + if not res: + break + # err = res.error() + # if err is not None: + # raise err + # val = res.value() + # for block in val: + # print(block.fetchall()) + if offset_value == "latest" and stop_flag == 0: + tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],int(round(time.time()*1000))) + stop_flag = 1 + finally: + consumer.unsubscribe() + consumer.close() + tdSql.checkEqual(consumer_info, expected_parameters) + start_group_id += 1 + tdSql.query('show subscriptions;') + subscription_info = tdSql.queryResult + if snapshot_value == "true": + if offset_value != "earliest" and offset_value != "": + if offset_value == "latest": + offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info)) + tdSql.checkEqual(sum(offset_value_list) > 0, True) + rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) + tdSql.checkEqual(sum(rows_value_list), expected_res) + elif offset_value == "none": + offset_value_list = list(map(lambda x: x[-2], subscription_info)) + tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info)) + rows_value_list = list(map(lambda x: x[-1], subscription_info)) + tdSql.checkEqual(rows_value_list, [0]*len(subscription_info)) + else: + if offset_value != "none": + offset_value_str = ",".join(list(map(lambda x: x[-2], subscription_info))) + tdSql.checkEqual("tsdb" in offset_value_str, True) + rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) + tdSql.checkEqual(sum(rows_value_list), expected_res) + else: + offset_value_list = list(map(lambda x: x[-2], subscription_info)) + tdSql.checkEqual(offset_value_list, [None]*len(subscription_info)) + rows_value_list = list(map(lambda x: x[-1], subscription_info)) + tdSql.checkEqual(rows_value_list, [None]*len(subscription_info)) + else: + if offset_value != "none": + offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info)) + tdSql.checkEqual(sum(offset_value_list) > 0, True) + rows_value_list = list(map(lambda x: int(x[-1]), subscription_info)) + tdSql.checkEqual(sum(rows_value_list), expected_res) + else: + offset_value_list = list(map(lambda x: x[-2], subscription_info)) + tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info)) + rows_value_list = list(map(lambda x: x[-1], subscription_info)) + tdSql.checkEqual(rows_value_list, [0]*len(subscription_info)) + tdSql.execute(f"drop topic if exists {topic_name}") + tdSql.execute(f'drop database if exists {paraDict["dbName"]}') + + def run(self): + self.tmqParamsTest() + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) diff --git a/tests/system-test/eco-system/util/Consumer.py b/tests/system-test/eco-system/util/Consumer.py new file mode 100644 index 0000000000000000000000000000000000000000..b483253a9582efe7202ff90de22c855f87fcee3b --- /dev/null +++ b/tests/system-test/eco-system/util/Consumer.py @@ -0,0 +1,82 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +# +# The option for wal_retetion_period and wal_retention_size is work well +# + +import taos +from taos.tmq import Consumer + +import os +import sys +import threading +import json +import time +from datetime import date +from datetime import datetime +from datetime import timedelta +from os import path + + +# consume topic +def consume_topic(topic_name, consume_cnt, wait): + print("start consume...") + consumer = Consumer( + { + "group.id": "tg2", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "enable.auto.commit": "true", + } + ) + print("start subscrite...") + consumer.subscribe([topic_name]) + + cnt = 0 + try: + while True and cnt < consume_cnt: + res = consumer.poll(1) + if not res: + if wait: + continue + else: + break + err = res.error() + if err is not None: + raise err + val = res.value() + cnt += 1 + print(f" consume {cnt} ") + for block in val: + print(block.fetchall()) + finally: + consumer.unsubscribe() + consumer.close() + + +if __name__ == "__main__": + print(sys.argv) + if len(sys.argv) < 2: + + print(" please input topic name for consume . -c for wait") + else: + wait = False + if "-c" == sys.argv[1]: + wait = True + topic = sys.argv[2] + else: + topic = sys.argv[1] + + print(f' wait={wait} topic={topic}') + consume_topic(topic, 10000000, wait) \ No newline at end of file diff --git a/tests/system-test/eco-system/util/restartDnodes.py b/tests/system-test/eco-system/util/restartDnodes.py new file mode 100644 index 0000000000000000000000000000000000000000..feee260fdf49957577a2204831de54e8183b222c --- /dev/null +++ b/tests/system-test/eco-system/util/restartDnodes.py @@ -0,0 +1,84 @@ +import time +import os +import subprocess +import random +import platform + +class dnode(): + def __init__(self, pid, path): + self.pid = pid + self.path = path + +# run exePath no wait finished +def runNoWait(exePath): + if platform.system().lower() == 'windows': + cmd = f"mintty -h never {exePath}" + else: + cmd = f"nohup {exePath} > /dev/null 2>&1 & " + + if os.system(cmd) != 0: + return False + else: + return True + +# get online dnodes +def getDnodes(): + cmd = "ps aux | grep taosd | awk '{{print $2,$11,$12,$13}}'" + result = os.system(cmd) + result=subprocess.check_output(cmd,shell=True) + strout = result.decode('utf-8').split("\n") + dnodes = [] + + for line in strout: + cols = line.split(' ') + if len(cols) != 4: + continue + exepath = cols[1] + if len(exepath) < 5 : + continue + if exepath[-5:] != 'taosd': + continue + + # add to list + path = cols[1] + " " + cols[2] + " " + cols[3] + dnodes.append(dnode(cols[0], path)) + + print(" show dnodes cnt=%d...\n"%(len(dnodes))) + for dn in dnodes: + print(f" pid={dn.pid} path={dn.path}") + + return dnodes + +def restartDnodes(dnodes, cnt, seconds): + print(f"start dnode cnt={cnt} wait={seconds}s") + selects = random.sample(dnodes, cnt) + for select in selects: + print(f" kill -9 {select.pid}") + cmd = f"kill -9 {select.pid}" + os.system(cmd) + print(f" restart {select.path}") + if runNoWait(select.path) == False: + print(f"run {select.path} failed.") + raise Exception("exe failed.") + print(f" sleep {seconds}s ...") + time.sleep(seconds) + +def run(): + # kill seconds interval + killLoop = 10 + minKill = 1 + maxKill = 10 + for i in range(killLoop): + dnodes = getDnodes() + killCnt = 0 + if len(dnodes) > 0: + killCnt = random.randint(1, len(dnodes)) + restartDnodes(dnodes, killCnt, random.randint(1, 5)) + + seconds = random.randint(minKill, maxKill) + print(f"----------- kill loop i={i} killCnt={killCnt} done. do sleep {seconds}s ... \n") + time.sleep(seconds) + + +if __name__ == '__main__': + run() \ No newline at end of file