提交 6cd0f98e 编写于 作者: wmmhello's avatar wmmhello

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/TS-3495

......@@ -24,6 +24,36 @@ The source code for the Python connector is hosted on [GitHub](https://github.co
We recommend using the latest version of `taospy`, regardless of the version of TDengine.
## Handling Exceptions
There are 4 types of exception in python connector.
- The exception of Python Connector itself.
- The exception of native library.
- The exception of websocket
- The exception of subscription.
- The exception of other TDengine function modules.
|Error Type|Description|Suggested Actions|
|:--------:|:---------:|:---------------:|
|InterfaceError|the native library is too old that it cannot support the function|please check the TDengine client version|
|ConnectionError|connection error|please check TDengine's status and the connection params|
|DatabaseError|database error|please upgrade Python connector to latest|
|OperationalError|operation error||
|ProgrammingError|||
|StatementError|the exception of stmt||
|ResultError|||
|SchemalessError|the exception of stmt schemaless||
|TmqError|the exception of stmt tmq||
It usually uses try-expect to handle exceptions in python. For exception handling, please refer to [Python Errors and Exceptions Documentation](https://docs.python.org/3/tutorial/errors.html).
All exceptions from the Python Connector are thrown directly. Applications should handle these exceptions. For example:
```python
{{#include docs/examples/python/handle_exception.py}}
```
## Supported features
- Native connections support all the core features of TDengine, including connection management, SQL execution, bind interface, subscriptions, and schemaless writing.
......@@ -343,6 +373,8 @@ For a more detailed description of the `sql()` method, please refer to [RestClie
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
The `Connection` class contains both an implementation of the PEP249 Connection interface (e.g., the `cursor()` method and the `close()` method) and many extensions (e.g., the `execute()`, `query()`, `schemaless_insert()`, and `subscribe()` methods).
```python
{{#include docs/examples/python/connect_websocket_examples.py:basic}}
```
......@@ -353,6 +385,46 @@ For a more detailed description of the `sql()` method, please refer to [RestClie
</TabItem>
</Tabs>
### Querying Data
<Tabs defaultValue="rest">
<TabItem value="native" label="native connection">
The `query` method of the `TaosConnection` class can be used to query data and return the result data of type `TaosResult`.
```python
{{#include docs/examples/python/connection_usage_native_reference.py:query}}
```
:::tip
The queried results can only be fetched once. For example, only one of `fetch_all()` and `fetch_all_into_dict()` can be used in the example above. Repeated fetches will result in an empty list.
:::
</TabItem>
<TabItem value="rest" label="REST connection">
The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-api). It contains only a `sql()` method for executing arbitrary SQL statements and returning the result.
```python
{{#include docs/examples/python/rest_client_example.py}}
```
For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html).
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
The `query` method of the `TaosConnection` class can be used to query data and return the result data of type `TaosResult`.
```python
{{#include docs/examples/python/connect_websocket_examples.py:basic}}
```
</TabItem>
</Tabs>
### Usage with req_id
By using the optional req_id parameter, you can specify a request ID that can be used for tracing.
......@@ -811,14 +883,6 @@ bind multiple rows at once |
## Other notes
### Exception handling
All errors from database operations are thrown directly as exceptions and the error message from the database is passed up the exception stack. The application is responsible for exception handling. For example:
```python
{{#include docs/examples/python/handle_exception.py}}
```
### About nanoseconds
Due to the current imperfection of Python's nanosecond support (see link below), the current implementation returns integers at nanosecond precision instead of the `datetime` type produced by `ms` and `us`, which application developers will need to handle on their own. And it is recommended to use pandas' to_datetime(). The Python Connector may modify the interface in the future if Python officially supports nanoseconds in full.
......
......@@ -25,6 +25,36 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
无论使用什么版本的 TDengine 都建议使用最新版本的 `taospy`。
## 处理异常
Python 连接器可能会产生 4 种异常:
- Python 连接器本身的异常
- 原生连接方式的异常
- websocket 连接方式异常
- 数据订阅异常
- TDengine 其他功能模块的异常
|Error Type|Description|Suggested Actions|
|:--------:|:---------:|:---------------:|
|InterfaceError|taosc 版本太低,不支持所使用的接口|请检查 TDengine 客户端版本|
|ConnectionError|数据库链接错误|请检查 TDengine 服务端状态和连接参数|
|DatabaseError|数据库错误|请检查 TDengine 服务端版本,并将 Python 连接器升级到最新版|
|OperationalError|操作错误|API 使用错误,请检查代码|
|ProgrammingError|||
|StatementError|stmt 相关异常||
|ResultError|||
|SchemalessError|schemaless 相关异常||
|TmqError|tmq 相关异常||
Python 中通常通过 try-expect 处理异常,异常处理相关请参考 [Python 错误和异常文档](https://docs.python.org/3/tutorial/errors.html)。
Python Connector 的所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:
```python
{{#include docs/examples/python/handle_exception.py}}
```
## 支持的功能
- 原生连接支持 TDengine 的所有核心功能, 包括: 连接管理、执行 SQL、参数绑定、订阅、无模式写入(schemaless)。
......@@ -32,7 +62,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con
## 安装
### 准备
### 安装前准备
1. 安装 Python。新近版本 taospy 包要求 Python 3.6.2+。早期版本 taospy 包要求 Python 3.7+。taos-ws-py 包要求 Python 3.7+。如果系统上还没有 Python 可参考 [Python BeginnersGuide](https://wiki.python.org/moin/BeginnersGuide/Download) 安装。
2. 安装 [pip](https://pypi.org/project/pip/)。大部分情况下 Python 的安装包都自带了 pip 工具, 如果没有请参考 [pip documentation](https://pip.pypa.io/en/stable/installation/) 安装。
......@@ -274,7 +304,7 @@ Transfer-Encoding: chunked
</TabItem>
</Tabs>
## 示例程序
## 使用示例
### 基本使用
......@@ -343,6 +373,10 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
#### Connection 类的使用
`Connection` 类既包含对 PEP249 Connection 接口的实现(如:cursor方法和 close 方法),也包含很多扩展功能(如: execute、 query、schemaless_insert 和 subscribe 方法。
```python
{{#include docs/examples/python/connect_websocket_examples.py:basic}}
```
......@@ -353,6 +387,46 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
</TabItem>
</Tabs>
### 查询数据
<Tabs defaultValue="rest">
<TabItem value="native" label="原生连接">
`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。
```python
{{#include docs/examples/python/connection_usage_native_reference.py:query}}
```
:::tip
查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。
:::
</TabItem>
<TabItem value="rest" label="REST 连接">
RestClient 类是对于 REST API 的直接封装。它只包含一个 sql() 方法用于执行任意 SQL 语句, 并返回执行结果。
```python
{{#include docs/examples/python/rest_client_example.py}}
```
对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
`TaosConnection` 类的 `query` 方法可以用来查询数据,返回 `TaosResult` 类型的结果数据。
```python
{{#include docs/examples/python/connect_websocket_examples.py:basic}}
```
</TabItem>
</Tabs>
### 与 req_id 一起使用
使用可选的 req_id 参数,指定请求 id,可以用于 tracing
......@@ -807,7 +881,7 @@ stmt.close()
</TabItem>
</Tabs>
### 其它示例程序
### 更多示例程序
| 示例程序链接 | 示例程序内容 |
| ------------------------------------------------------------------------------------------------------------- | ----------------------- |
......@@ -819,14 +893,6 @@ stmt.close()
## 其它说明
### 异常处理
所有数据库操作如果出现异常,都会直接抛出来。由应用程序负责异常处理。比如:
```python
{{#include docs/examples/python/handle_exception.py}}
```
``
### 关于纳秒 (nanosecond)
由于目前 Python 对 nanosecond 支持的不完善(见下面的链接),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,Python 连接器可能会修改相关接口。
......
......@@ -187,6 +187,7 @@ int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* p
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx);
void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
......
......@@ -184,6 +184,7 @@ extern int64_t tsStreamBufferSize;
extern int64_t tsCheckpointInterval;
extern bool tsFilterScalarMode;
extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......
......@@ -246,6 +246,7 @@ typedef struct SSortLogicNode {
SLogicNode node;
SNodeList* pSortKeys;
bool groupSort;
int64_t maxRows;
} SSortLogicNode;
typedef struct SPartitionLogicNode {
......@@ -523,6 +524,7 @@ typedef struct SSortPhysiNode {
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
SNodeList* pTargets;
int64_t maxRows;
} SSortPhysiNode;
typedef SSortPhysiNode SGroupSortPhysiNode;
......
......@@ -17,6 +17,7 @@
#define _TD_UTIL_HEAP_H_
#include "os.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
......@@ -58,6 +59,48 @@ void heapDequeue(Heap* heap);
size_t heapSize(Heap* heap);
typedef bool (*pq_comp_fn)(void* l, void* r, void* param);
typedef struct PriorityQueueNode {
void* data;
} PriorityQueueNode;
typedef struct PriorityQueue PriorityQueue;
PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param);
void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn);
void destroyPriorityQueue(PriorityQueue* pq);
PriorityQueueNode* taosPQTop(PriorityQueue* pq);
size_t taosPQSize(PriorityQueue* pq);
void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node);
void taosPQPop(PriorityQueue* pq);
typedef struct BoundedQueue BoundedQueue;
BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete deleteFn, void* param);
void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn);
void destroyBoundedQueue(BoundedQueue* q);
void taosBQPush(BoundedQueue* q, PriorityQueueNode* n);
PriorityQueueNode* taosBQTop(BoundedQueue* q);
size_t taosBQSize(BoundedQueue* q);
size_t taosBQMaxSize(BoundedQueue* q);
void taosBQBuildHeap(BoundedQueue* q);
void taosBQPop(BoundedQueue* q);
#ifdef __cplusplus
}
#endif
......
......@@ -47,6 +47,17 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo
}
}
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) {
if (colDataIsNull_s(pColumnInfoData, rowIdx)) return 0;
if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes;
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON)
return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx));
else
return varDataTLen(colDataGetData(pColumnInfoData, rowIdx));
}
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
......
......@@ -62,6 +62,7 @@ int32_t tsNumOfQnodeFetchThreads = 1;
int32_t tsNumOfSnodeStreamThreads = 4;
int32_t tsNumOfSnodeWriteThreads = 1;
int32_t tsMaxStreamBackendCache = 128; // M
int32_t tsPQSortMemThreshold = 16; // M
// sync raft
int32_t tsElectInterval = 25 * 1000;
......@@ -533,6 +534,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, 0) != 0) return -1;
GRANT_CFG_ADD;
return 0;
......@@ -914,6 +916,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval;
tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32;
tsPQSortMemThreshold = cfgGetItem(pCfg, "pqSortMemThreshold")->i32;
GRANT_CFG_GET;
return 0;
......
......@@ -79,6 +79,7 @@ typedef struct SColMatchItem {
int32_t srcSlotId;
int32_t dstSlotId;
bool needOutput;
SDataType dataType;
} SColMatchItem;
typedef struct SColMatchInfo {
......
......@@ -64,10 +64,14 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
/**
*
* @param type
* @param maxRows keep maxRows at most
* @param maxTupleLength max len of one tuple, for check if heap sort is applicable
* @param sortBufSize sort memory buf size, for check if heap sort is applicable
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr);
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize);
/**
*
......
......@@ -1305,6 +1305,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
c.colId = pColNode->colId;
c.srcSlotId = pColNode->slotId;
c.dstSlotId = pNode->slotId;
c.dataType = pColNode->node.resType;
taosArrayPush(pList, &c);
}
}
......
......@@ -2801,7 +2801,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str);
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);
......
......@@ -29,6 +29,8 @@ typedef struct SSortOperatorInfo {
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SLimitInfo limitInfo;
uint64_t maxTupleLength;
int64_t maxRows;
} SSortOperatorInfo;
static SSDataBlock* doSort(SOperatorInfo* pOperator);
......@@ -36,6 +38,7 @@ static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
static void destroySortOperatorInfo(void* param);
static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
// todo add limit/offset impl
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
......@@ -51,6 +54,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
int32_t numOfCols = 0;
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
pOperator->exprSupp.numOfExprs = numOfCols;
calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
pInfo->maxRows = pSortNode->maxRows;
int32_t numOfOutputCols = 0;
int32_t code =
......@@ -193,9 +198,9 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
}
pInfo->startTs = taosGetTimestampUs();
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str);
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str,
pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
......@@ -286,6 +291,20 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t*
return TSDB_CODE_SUCCESS;
}
static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) {
SColMatchInfo* pColItem = &pSortOperInfo->matchInfo;
size_t size = taosArrayGetSize(pColItem->pList);
for (size_t i = 0; i < size; ++i) {
pSortOperInfo->maxTupleLength += ((SColMatchItem*)taosArrayGet(pColItem->pList, i))->dataType.bytes;
}
size = LIST_LENGTH(pSortKeys);
for (size_t i = 0; i < size; ++i) {
SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i);
pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes;
}
return TSDB_CODE_SUCCESS;
}
//=====================================================================================
// Group Sort Operator
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
......@@ -384,7 +403,7 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pCurrSortHandle =
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str);
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
......@@ -582,7 +601,7 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pInputBlock, pTaskInfo->id.str);
pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort);
......
......@@ -19,6 +19,7 @@
#include "tcompare.h"
#include "tdatablock.h"
#include "tdef.h"
#include "theap.h"
#include "tlosertree.h"
#include "tpagedbuf.h"
#include "tsort.h"
......@@ -41,6 +42,12 @@ struct SSortHandle {
int64_t startTs;
uint64_t totalElapsed;
uint64_t maxRows;
uint32_t maxTupleLength;
uint32_t sortBufSize;
BoundedQueue* pBoundedQueue;
uint32_t tmpRowIdx;
int32_t sourceId;
SSDataBlock* pDataBlock;
SMsortComparParam cmpParam;
......@@ -61,6 +68,47 @@ struct SSortHandle {
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
// | offset[0] | offset[1] |....| nullbitmap | data |...|
static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
return taosMemoryCalloc(1, totalLen);
}
static void destoryTuple(void* t) { taosMemoryFree(t); }
#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset)
#define tupleSetNull(tuple, colIdx, colNum) colDataSetNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
#define tupleColIsNull(tuple, colIdx, colNum) colDataIsNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
#define tupleGetDataStartOffset(colNum) (sizeof(uint32_t) * colNum + BitmapLen(colNum))
#define tupleSetData(tuple, offset, data, length) memcpy(tuple + offset, data, length)
/**
* @param t the tuple pointer addr, if realloced, *t is changed to the new addr
* @param offset copy data into pTuple start from offset
* @param colIndex the columnIndex, for setting null bitmap
* @return the next offset to add field
* */
static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, size_t length,
bool isNull, uint32_t tupleLen) {
tupleSetOffset(*t, colIdx, offset);
if (isNull) {
tupleSetNull(*t, colIdx, colNum);
} else {
if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) {
*t = taosMemoryRealloc(*t, offset + length);
}
tupleSetData(*t, offset, data, length);
}
return offset + length;
}
static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
if (tupleColIsNull(t, colIdx, colNum)) return NULL;
return t + *tupleOffset(t, colIdx);
}
static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param);
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
return createOneDataBlock(pSortHandle->pDataBlock, false);
}
......@@ -71,7 +119,8 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr) {
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize) {
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
pSortHandle->type = type;
......@@ -80,6 +129,13 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->pSortInfo = pSortInfo;
pSortHandle->loops = 0;
pSortHandle->maxTupleLength = maxTupleLength;
if (maxRows < 0)
pSortHandle->sortBufSize = 0;
else
pSortHandle->sortBufSize = sortBufSize;
pSortHandle->maxRows = maxRows;
if (pBlock != NULL) {
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
......@@ -150,7 +206,6 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
if (pSortHandle == NULL) {
return;
}
tsortClose(pSortHandle);
if (pSortHandle->pMergeTree != NULL) {
tMergeTreeDestroy(&pSortHandle->pMergeTree);
......@@ -159,6 +214,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
destroyDiskbasedBuf(pSortHandle->pBuf);
taosMemoryFreeClear(pSortHandle->idStr);
blockDataDestroy(pSortHandle->pDataBlock);
if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
int64_t fetchUs = 0, fetchNum = 0;
tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
......@@ -769,17 +825,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
return code;
}
int32_t tsortOpen(SSortHandle* pHandle) {
if (pHandle->opened) {
return 0;
}
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return -1;
}
pHandle->opened = true;
static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
int32_t code = createInitialSources(pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -840,7 +886,7 @@ int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
return TSDB_CODE_SUCCESS;
}
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
static STupleHandle* tsortBufMergeSortNextTuple(SSortHandle* pHandle) {
if (tsortIsClosed(pHandle)) {
return NULL;
}
......@@ -890,6 +936,168 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
return &pHandle->tupleHandle;
}
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*));
return maxRowsFitInMemory > pHandle->maxRows;
}
static bool tsortPQCompFn(void* a, void* b, void* param) {
SSortHandle* pHandle = param;
int32_t res = pHandle->comparFn(a, b, param);
if (res < 0) return 1;
return 0;
}
static bool tsortPQComFnReverse(void*a, void* b, void* param) {
SSortHandle* pHandle = param;
int32_t res = pHandle->comparFn(a, b, param);
if (res > 0) return 1;
return 0;
}
static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param) {
char* pLTuple = (char*)pLeft;
char* pRTuple = (char*)pRight;
SSortHandle* pHandle = (SSortHandle*)param;
SArray* orderInfo = (SArray*)pHandle->pSortInfo;
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
for (int32_t i = 0; i < orderInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
void *lData = tupleGetField(pLTuple, pOrder->slotId, colNum);
void *rData = tupleGetField(pRTuple, pOrder->slotId, colNum);
if (!lData && !rData) continue;
if (!lData) return pOrder->nullFirst ? -1 : 1;
if (!rData) return pOrder->nullFirst ? 1 : -1;
int type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type;
__compar_fn_t fn = getKeyComparFunc(type, pOrder->order);
int ret = fn(lData, rData);
if (ret == 0) {
continue;
} else {
return ret;
}
}
return 0;
}
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destoryTuple, pHandle);
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
tsortSetComparFp(pHandle, colDataComparFn);
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
SSortSource* source = *pSource;
pHandle->pDataBlock = NULL;
uint32_t tupleLen = 0;
PriorityQueueNode pqNode;
while (1) {
// fetch data
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (NULL == pBlock) break;
if (pHandle->beforeFp != NULL) {
pHandle->beforeFp(pBlock, pHandle->param);
}
if (pHandle->pDataBlock == NULL) {
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
if (pHandle->pDataBlock == NULL) return TSDB_CODE_OUT_OF_MEMORY;
size_t colNum = blockDataGetNumOfCols(pBlock);
if (tupleLen == 0) {
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
tupleLen += pCol->info.bytes;
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
tupleLen += sizeof(VarDataLenT);
}
}
}
size_t colLen = 0;
for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
void* pTuple = createTuple(colNum, tupleLen);
if (pTuple == NULL) return TSDB_CODE_OUT_OF_MEMORY;
uint32_t offset = tupleGetDataStartOffset(colNum);
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
if (colDataIsNull_s(pCol, rowIdx)) {
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
} else {
colLen = colDataGetRowLength(pCol, rowIdx);
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false,
tupleLen);
}
}
pqNode.data = pTuple;
taosBQPush(pHandle->pBoundedQueue, &pqNode);
}
}
return TSDB_CODE_SUCCESS;
}
static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
blockDataCleanup(pHandle->pDataBlock);
blockDataEnsureCapacity(pHandle->pDataBlock, 1);
// abondan the top tuple if queue size bigger than max size
if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
taosBQPop(pHandle->pBoundedQueue);
}
if (pHandle->tmpRowIdx == 0) {
// sort the results
taosBQSetFn(pHandle->pBoundedQueue, tsortPQComFnReverse);
taosBQBuildHeap(pHandle->pBoundedQueue);
}
if (taosBQSize(pHandle->pBoundedQueue) > 0) {
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
char* pTuple = (char*)node->data;
for (uint32_t i = 0; i < colNum; ++i) {
void* pData = tupleGetField(pTuple, i, colNum);
if (!pData) {
colDataSetNULL(bdGetColumnInfoData(pHandle->pDataBlock, i), 0);
} else {
colDataAppend(bdGetColumnInfoData(pHandle->pDataBlock, i), 0, pData, false);
}
}
pHandle->pDataBlock->info.rows++;
pHandle->tmpRowIdx++;
taosBQPop(pHandle->pBoundedQueue);
}
if (pHandle->pDataBlock->info.rows == 0) return NULL;
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
return &pHandle->tupleHandle;
}
int32_t tsortOpen(SSortHandle* pHandle) {
if (pHandle->opened) {
return 0;
}
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return -1;
}
pHandle->opened = true;
if (tsortIsPQSortApplicable(pHandle))
return tsortOpenForPQSort(pHandle);
else
return tsortOpenForBufMergeSort(pHandle);
}
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
if (pHandle->pBoundedQueue)
return tsortPQSortNextTuple(pHandle);
else
return tsortBufMergeSortNextTuple(pHandle);
}
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);
......
......@@ -502,6 +502,7 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pSortKeys);
COPY_SCALAR_FIELD(groupSort);
COPY_SCALAR_FIELD(maxRows);
return TSDB_CODE_SUCCESS;
}
......
......@@ -2100,6 +2100,7 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
static const char* jkSortPhysiPlanExprs = "Exprs";
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
static const char* jkSortPhysiPlanTargets = "Targets";
static const char* jkSortPhysiPlanMaxRows = "MaxRows";
static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
......@@ -2114,6 +2115,9 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSortPhysiPlanMaxRows, pNode->maxRows);
}
return code;
}
......@@ -2131,6 +2135,9 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkSortPhysiPlanMaxRows, &pNode->maxRows);
}
return code;
}
......
......@@ -2594,7 +2594,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS };
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_MAX_ROWS };
static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
......@@ -2609,6 +2609,9 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_SORT_CODE_MAX_ROWS, pNode->maxRows);
}
return code;
}
......@@ -2632,6 +2635,9 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_CODE_TARGETS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
break;
case PHY_SORT_CODE_MAX_ROWS:
code = tlvDecodeI64(pTlv, &pNode->maxRows);
break;
default:
break;
}
......
......@@ -1027,6 +1027,7 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->maxRows = -1;
pSort->groupSort = pSelect->groupSort;
pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
......@@ -1298,6 +1299,7 @@ static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* p
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->maxRows = -1;
TSWAP(pSort->node.pLimit, pSetOperator->pLimit);
int32_t code = TSDB_CODE_SUCCESS;
......
......@@ -123,7 +123,7 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order, SLogicNode* pNode
pNode->inputTsOrder = order;
switch (nodeType(pNode)) {
// for those nodes that will change the order, stop propagating
//case QUERY_NODE_LOGIC_PLAN_WINDOW:
// case QUERY_NODE_LOGIC_PLAN_WINDOW:
case QUERY_NODE_LOGIC_PLAN_JOIN:
case QUERY_NODE_LOGIC_PLAN_AGG:
case QUERY_NODE_LOGIC_PLAN_SORT:
......@@ -769,8 +769,9 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond)
}
SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft);
SColumnNode* pRight = (SColumnNode*)(pOper->pRight);
//TODO: add cast to operator and remove this restriction of optimization
if (pLeft->node.resType.type != pRight->node.resType.type || pLeft->node.resType.bytes != pRight->node.resType.bytes) {
// TODO: add cast to operator and remove this restriction of optimization
if (pLeft->node.resType.type != pRight->node.resType.type ||
pLeft->node.resType.bytes != pRight->node.resType.bytes) {
return false;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
......@@ -2575,7 +2576,7 @@ static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) {
SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode);
if (NULL != pNode) {
//TODO: only set the slimit now. push down slimit later
// TODO: only set the slimit now. push down slimit later
pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit);
((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset;
((SLimitNode*)pTableScanNode->pSlimit)->offset = 0;
......@@ -2629,8 +2630,16 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
}
static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) {
if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren)) {
return false;
}
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
SLimitNode* pChildLimit = (SLimitNode*)(pChild->pLimit);
// if we have pushed down, we skip it
if ((*(SSortLogicNode*)pChild).maxRows != -1) return false;
} else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) {
return false;
}
return true;
......@@ -2644,8 +2653,18 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
nodesDestroyNode(pChild->pLimit);
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
SLimitNode* pLimitNode = (SLimitNode*)pNode->pLimit;
int64_t maxRows = -1;
if (pLimitNode->limit != -1) {
maxRows = pLimitNode->limit;
if (pLimitNode->offset != -1) maxRows += pLimitNode->offset;
}
((SSortLogicNode*)pChild)->maxRows = maxRows;
} else {
pChild->pLimit = pNode->pLimit;
pNode->pLimit = NULL;
}
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;
......@@ -2898,7 +2917,7 @@ static SSortLogicNode* sortNonPriKeySatisfied(SLogicNode* pNode) {
if (sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys)) {
return NULL;
}
SNode* pSortKeyNode = NULL, *pSortKeyExpr = NULL;
SNode *pSortKeyNode = NULL, *pSortKeyExpr = NULL;
FOREACH(pSortKeyNode, pSort->pSortKeys) {
pSortKeyExpr = ((SOrderByExprNode*)pSortKeyNode)->pExpr;
switch (nodeType(pSortKeyExpr)) {
......
......@@ -1374,6 +1374,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
if (NULL == pSort) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->maxRows = pSortLogicNode->maxRows;
SNodeList* pPrecalcExprs = NULL;
SNodeList* pSortKeys = NULL;
......
......@@ -1018,6 +1018,7 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
splSetParent((SLogicNode*)pPartSort);
pPartSort->pSortKeys = pSortKeys;
pPartSort->groupSort = pSort->groupSort;
pPartSort->maxRows = pSort->maxRows;
code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
}
......
......@@ -482,6 +482,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) {
sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
pEntry->index);
taosMsleep(1);
goto _out;
}
ASSERT(pEntry->index == pBuf->matchIndex);
......
......@@ -364,7 +364,7 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64
if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
pSyncNode->hbSlowNum++;
sNInfo(pSyncNode,
sNTrace(pSyncNode,
"recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
......
......@@ -187,3 +187,172 @@ void heapRemove(Heap* heap, HeapNode* node) {
}
void heapDequeue(Heap* heap) { heapRemove(heap, heap->min); }
struct PriorityQueue {
SArray* container;
pq_comp_fn fn;
FDelete deleteFn;
void* param;
};
PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param) {
PriorityQueue* pq = (PriorityQueue*)taosMemoryCalloc(1, sizeof(PriorityQueue));
pq->container = taosArrayInit(1, sizeof(PriorityQueueNode));
pq->fn = fn;
pq->deleteFn = deleteFn;
pq->param = param;
return pq;
}
void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn) {
pq->fn = fn;
}
void destroyPriorityQueue(PriorityQueue* pq) {
if (pq->deleteFn)
taosArrayDestroyP(pq->container, pq->deleteFn);
else
taosArrayDestroy(pq->container);
taosMemoryFree(pq);
}
static size_t pqParent(size_t i) { return (--i) >> 1; /* (i - 1) / 2 */ }
static size_t pqLeft(size_t i) { return (i << 1) | 1; /* i * 2 + 1 */ }
static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */}
static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) {
void * tmp = a->data;
a->data = b->data;
b->data = tmp;
}
#define pqContainerGetEle(pq, i) ((PriorityQueueNode*)taosArrayGet((pq)->container, (i)))
#define pqContainerSize(pq) (taosArrayGetSize((pq)->container))
size_t taosPQSize(PriorityQueue* pq) { return pqContainerSize(pq); }
static void pqHeapify(PriorityQueue* pq, size_t from, size_t last) {
size_t largest = from;
do {
from = largest;
size_t l = pqLeft(from);
size_t r = pqRight(from);
if (l < last && pq->fn(pqContainerGetEle(pq, from)->data, pqContainerGetEle(pq, l)->data, pq->param)) {
largest = l;
}
if (r < last && pq->fn(pqContainerGetEle(pq, largest)->data, pqContainerGetEle(pq, r)->data, pq->param)) {
largest = r;
}
if (largest != from) {
pqSwapPQNode(pqContainerGetEle(pq, from), pqContainerGetEle(pq, largest));
}
} while (largest != from);
}
static void pqBuildHeap(PriorityQueue* pq) {
if (pqContainerSize(pq) > 1) {
for (size_t i = pqContainerSize(pq) - 1; i > 0; --i) {
pqHeapify(pq, i, pqContainerSize(pq));
}
pqHeapify(pq, 0, pqContainerSize(pq));
}
}
static void pqReverseHeapify(PriorityQueue* pq, size_t i) {
while (i > 0 && !pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) {
size_t parentIdx = pqParent(i);
pqSwapPQNode(pqContainerGetEle(pq, i), pqContainerGetEle(pq, parentIdx));
i = parentIdx;
}
}
static void pqUpdate(PriorityQueue* pq, size_t i) {
if (i == 0 || pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) {
// if value in pos i is smaller than parent, heapify down from i to the end
pqHeapify(pq, i, pqContainerSize(pq));
} else {
// if value in pos i is big than parent, heapify up from i
pqReverseHeapify(pq, i);
}
}
static void pqRemove(PriorityQueue* pq, size_t i) {
if (i == pqContainerSize(pq) - 1) {
taosArrayPop(pq->container);
return;
}
taosArraySet(pq->container, i, taosArrayGet(pq->container, pqContainerSize(pq) - 1));
taosArrayPop(pq->container);
pqUpdate(pq, i);
}
PriorityQueueNode* taosPQTop(PriorityQueue* pq) {
return pqContainerGetEle(pq, 0);
}
void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) {
taosArrayPush(pq->container, node);
pqReverseHeapify(pq, pqContainerSize(pq) - 1);
}
void taosPQPop(PriorityQueue* pq) {
PriorityQueueNode* top = taosPQTop(pq);
if (pq->deleteFn) pq->deleteFn(top->data);
pqRemove(pq, 0);
}
struct BoundedQueue {
PriorityQueue* queue;
uint32_t maxSize;
};
BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete deleteFn, void* param) {
BoundedQueue* q = (BoundedQueue*)taosMemoryCalloc(1, sizeof(BoundedQueue));
q->queue = createPriorityQueue(fn, deleteFn, param);
taosArrayEnsureCap(q->queue->container, maxSize + 1);
q->maxSize = maxSize;
return q;
}
void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) {
taosPQSetFn(q->queue, fn);
}
void destroyBoundedQueue(BoundedQueue* q) {
if (!q) return;
destroyPriorityQueue(q->queue);
taosMemoryFree(q);
}
void taosBQPush(BoundedQueue* q, PriorityQueueNode* n) {
if (pqContainerSize(q->queue) == q->maxSize + 1) {
PriorityQueueNode* top = pqContainerGetEle(q->queue, 0);
void *p = top->data;
top->data = n->data;
n->data = p;
if (q->queue->deleteFn) q->queue->deleteFn(n->data);
pqHeapify(q->queue, 0, taosBQSize(q));
} else {
taosPQPush(q->queue, n);
}
}
PriorityQueueNode* taosBQTop(BoundedQueue* q) {
return taosPQTop(q->queue);
}
void taosBQBuildHeap(BoundedQueue *q) {
pqBuildHeap(q->queue);
}
size_t taosBQMaxSize(BoundedQueue* q) {
return q->maxSize;
}
size_t taosBQSize(BoundedQueue* q) {
return taosPQSize(q->queue);
}
void taosBQPop(BoundedQueue* q) {
taosPQPop(q->queue);
}
......@@ -33,6 +33,8 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb3.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqParamsTest.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
......
......@@ -468,7 +468,7 @@ if $data01 != 1 then
endi
## supertable aggregation + where + interval + group by order by tag + limit offset
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9),t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 2 offset 0
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9),t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 2 offset 0
if $rows != 2 then
return -1
endi
......
......@@ -508,7 +508,7 @@ endi
### supertable aggregation + where + interval + group by order by tag + limit offset
## TBASE-345
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 3 offset 0
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 3 offset 0
if $rows != 3 then
return -1
endi
......@@ -554,7 +554,7 @@ if $data09 != 4 then
return -1
endi
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 and c1 > 0 and c2 < 9 and c3 > 4 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 3 offset 0
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 and c1 > 0 and c2 < 9 and c3 > 4 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 3 offset 0
if $rows != 3 then
return -1
endi
......
......@@ -126,7 +126,6 @@ endi
if $data10 != 1 then
return -1
endi
sql (select 'ab' as options from union_tb1 limit 1) union all (select 'dd' as options from union_tb0 limit 1) order by options;
if $rows != 2 then
return -1
......
......@@ -2558,3 +2558,243 @@ taos> select a.ts, a.c2, b.c2 from meters as a join (select * from meters order
2022-05-24 00:01:08.000 | 210 | 210 |
2022-05-24 00:01:08.000 | 210 | 210 |
taos> select ts, c2 from meters order by c2;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
2022-05-23 00:01:08.000 | 116 |
2022-05-20 00:01:08.000 | 120 |
2022-05-20 00:01:08.000 | 120 |
2022-05-16 00:01:08.000 | 136 |
2022-05-16 00:01:08.000 | 136 |
2022-05-22 00:01:08.000 | 196 |
2022-05-22 00:01:08.000 | 196 |
2022-05-24 00:01:08.000 | 210 |
2022-05-24 00:01:08.000 | 210 |
2022-05-15 00:01:08.000 | 234 |
2022-05-15 00:01:08.000 | 234 |
2022-05-19 00:01:08.000 | 243 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from meters order by c2 limit 4;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
taos> select ts, c2 from meters order by c2 limit 2,2;
ts | c2 |
========================================
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
taos> select ts, c2 from meters order by ts asc, c2 desc limit 10;
ts | c2 |
========================================
2022-05-15 00:01:08.000 | 234 |
2022-05-15 00:01:08.000 | 234 |
2022-05-16 00:01:08.000 | 136 |
2022-05-16 00:01:08.000 | 136 |
2022-05-17 00:01:08.000 | 59 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
2022-05-19 00:01:08.000 | 243 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from meters order by ts asc, c2 desc limit 5,5;
ts | c2 |
========================================
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
2022-05-19 00:01:08.000 | 243 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from d1 order by c2;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
2022-05-20 00:01:08.000 | 120 |
2022-05-16 00:01:08.000 | 136 |
2022-05-22 00:01:08.000 | 196 |
2022-05-24 00:01:08.000 | 210 |
2022-05-15 00:01:08.000 | 234 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from d1 order by c2 limit 4;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
taos> select ts, c2 from d1 order by c2 limit 2,2;
ts | c2 |
========================================
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
taos> select ts, c2 from d1 order by ts asc, c2 desc limit 10;
ts | c2 |
========================================
2022-05-15 00:01:08.000 | 234 |
2022-05-16 00:01:08.000 | 136 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-19 00:01:08.000 | 243 |
2022-05-20 00:01:08.000 | 120 |
2022-05-21 00:01:08.000 | 11 |
2022-05-22 00:01:08.000 | 196 |
2022-05-23 00:01:08.000 | 116 |
2022-05-24 00:01:08.000 | 210 |
taos> select ts, c2 from d1 order by ts asc, c2 desc limit 5,5;
ts | c2 |
========================================
2022-05-20 00:01:08.000 | 120 |
2022-05-21 00:01:08.000 | 11 |
2022-05-22 00:01:08.000 | 196 |
2022-05-23 00:01:08.000 | 116 |
2022-05-24 00:01:08.000 | 210 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc;
_wstart | d | avg(c) |
================================================================================
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 |
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 |
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 |
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 |
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 |
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 |
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 |
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 |
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 |
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 |
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 |
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 |
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 |
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 |
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 |
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 |
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 |
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 |
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 |
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 |
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2;
_wstart | d | avg(c) |
================================================================================
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6;
_wstart | d | avg(c) |
================================================================================
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10;
last(ts) | d |
========================================
2022-05-19 00:01:08.000 | 243 |
2022-05-15 00:01:08.000 | 234 |
2022-05-24 00:01:08.000 | 210 |
2022-05-22 00:01:08.000 | 196 |
2022-05-16 00:01:08.000 | 136 |
2022-05-20 00:01:08.000 | 120 |
2022-05-23 00:01:08.000 | 116 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-21 00:01:08.000 | 11 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 2,8;
last(ts) | d |
========================================
2022-05-24 00:01:08.000 | 210 |
2022-05-22 00:01:08.000 | 196 |
2022-05-16 00:01:08.000 | 136 |
2022-05-20 00:01:08.000 | 120 |
2022-05-23 00:01:08.000 | 116 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-21 00:01:08.000 | 11 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 9,1;
last(ts) | d |
========================================
2022-05-21 00:01:08.000 | 11 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 2,8;
last(ts) | d |
========================================
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
2022-05-20 00:01:08.000 | 120 |
2022-05-16 00:01:08.000 | 136 |
2022-05-22 00:01:08.000 | 196 |
2022-05-24 00:01:08.000 | 210 |
2022-05-15 00:01:08.000 | 234 |
2022-05-19 00:01:08.000 | 243 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1;
last(ts) | d |
========================================
2022-05-19 00:01:08.000 | 243 |
taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10;
ts | d |
========================================
2022-05-24 00:01:08.000 | 210 |
2022-05-23 00:01:08.000 | 116 |
2022-05-22 00:01:08.000 | 196 |
2022-05-21 00:01:08.000 | 11 |
2022-05-20 00:01:08.000 | 120 |
2022-05-19 00:01:08.000 | 243 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-16 00:01:08.000 | 136 |
2022-05-15 00:01:08.000 | 234 |
taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8;
ts | d |
========================================
2022-05-22 00:01:08.000 | 196 |
2022-05-21 00:01:08.000 | 11 |
2022-05-20 00:01:08.000 | 120 |
2022-05-19 00:01:08.000 | 243 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-16 00:01:08.000 | 136 |
2022-05-15 00:01:08.000 | 234 |
taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1;
ts | d |
========================================
2022-05-15 00:01:08.000 | 234 |
......@@ -71,3 +71,30 @@ select a.ts, a.c2, b.c2 from meters as a join meters as b on a.ts = b.ts order b
explain verbose true select a.ts, a.c2, b.c2 from meters as a join (select ts, c2 from meters order by ts desc) b on a.ts = b.ts order by a.ts desc\G;
explain verbose true select a.ts, a.c2, b.c2 from meters as a join (select ts, c2 from meters order by ts desc) b on a.ts = b.ts order by a.ts asc\G;
select a.ts, a.c2, b.c2 from meters as a join (select * from meters order by ts desc) b on a.ts = b.ts order by a.ts asc;
select ts, c2 from meters order by c2;
select ts, c2 from meters order by c2 limit 4;
select ts, c2 from meters order by c2 limit 2,2;
select ts, c2 from meters order by ts asc, c2 desc limit 10;
select ts, c2 from meters order by ts asc, c2 desc limit 5,5;
select ts, c2 from d1 order by c2;
select ts, c2 from d1 order by c2 limit 4;
select ts, c2 from d1 order by c2 limit 2,2;
select ts, c2 from d1 order by ts asc, c2 desc limit 10;
select ts, c2 from d1 order by ts asc, c2 desc limit 5,5;
select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc;
select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2;
select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6;
select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10;
select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 2,8;
select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 9,1;
select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 2,8;
select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1;
......@@ -328,9 +328,28 @@ class TDTestCase:
tdLog.exit("split vgroup transaction is not finished after executing 50s")
return False
# split error
def expectSplitError(self, dbName):
vgids = self.getVGroup(dbName)
selid = random.choice(vgids)
sql = f"split vgroup {selid}"
tdLog.info(sql)
tdSql.error(sql)
# expect split ok
def expectSplitOk(self, dbName):
# split vgroup
vgList1 = self.getVGroup(dbName)
self.splitVGroup(dbName)
vgList2 = self.getVGroup(dbName)
vgNum1 = len(vgList1) + 1
vgNum2 = len(vgList2)
if vgNum1 != vgNum2:
tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}")
return
# split empty database
def splitEmptyDB(self):
dbName = "emptydb"
vgNum = 2
# create database
......@@ -339,17 +358,33 @@ class TDTestCase:
tdSql.execute(sql)
# split vgroup
self.splitVGroup(dbName)
vgList = self.getVGroup(dbName)
vgNum1 = len(vgList)
vgNum2 = vgNum + 1
if vgNum1 != vgNum2:
tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}")
return
self.expectSplitOk(dbName)
# forbid
def checkForbid(self):
# stream
tdLog.info("check forbid split having stream...")
tdSql.execute("create database streamdb;")
tdSql.execute("use streamdb;")
tdSql.execute("create table ta(ts timestamp, age int);")
tdSql.execute("create stream ma into sta as select count(*) from ta interval(1s);")
self.expectSplitError("streamdb")
tdSql.execute("drop stream ma;")
self.expectSplitOk("streamdb")
# topic
tdLog.info("check forbid split having topic...")
tdSql.execute("create database topicdb wal_retention_period 10;")
tdSql.execute("use topicdb;")
tdSql.execute("create table ta(ts timestamp, age int);")
tdSql.execute("create topic toa as select * from ta;")
self.expectSplitError("topicdb")
tdSql.execute("drop topic toa;")
self.expectSplitOk("topicdb")
# run
def run(self):
# prepare env
self.prepareEnv()
......@@ -360,12 +395,13 @@ class TDTestCase:
# check two db query result same
self.checkResult()
tdLog.info(f"split vgroup i={i} passed.")
# split empty db
self.splitEmptyDB()
# check topic and stream forib
self.checkForbid()
# stop
def stop(self):
......
......@@ -321,7 +321,7 @@ class TDTestCase:
limit = 5
offset = paraDict["rowsPerTbl"] * 2
offset = offset - 2
sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1 limit %d offset %d"%(limit, offset)
sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1, max(c1) limit %d offset %d"%(limit, offset)
# tdLog.info("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(1)
......
import sys
import time
import threading
from taos.tmq import Consumer
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
self.wal_retention_period1 = 3600
self.wal_retention_period2 = 1
self.commit_value_list = ["true", "false"]
self.offset_value_list = ["", "earliest", "latest", "none"]
self.tbname_value_list = ["true", "false"]
self.snapshot_value_list = ["true", "false"]
# self.commit_value_list = ["true"]
# self.offset_value_list = ["none"]
# self.tbname_value_list = ["true"]
# self.snapshot_value_list = ["true"]
def tmqParamsTest(self):
paraDict = {'dbName': 'db1',
'dropFlag': 1,
'vgroups': 4,
'stbName': 'stb',
'colSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 1,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'auto_commit_interval': "100"}
start_group_id = 1
for snapshot_value in self.snapshot_value_list:
for commit_value in self.commit_value_list:
for offset_value in self.offset_value_list:
for tbname_value in self.tbname_value_list:
topic_name = 'topic1'
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
tdLog.info("create stb")
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
tdLog.info("create ctb")
tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
tdLog.info("insert data")
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topic_name, queryString)
tdSql.query(f'select * from information_schema.ins_databases')
db_wal_retention_period_list = list(map(lambda x:x[-8] if x[0] == paraDict['dbName'] else None, tdSql.queryResult))
for i in range(len(db_wal_retention_period_list)):
if db_wal_retention_period_list[0] is None or db_wal_retention_period_list[-1] is None:
db_wal_retention_period_list.remove(None)
if snapshot_value =="true":
if db_wal_retention_period_list[0] != self.wal_retention_period2:
tdSql.execute(f"alter database {paraDict['dbName']} wal_retention_period {self.wal_retention_period2}")
time.sleep(self.wal_retention_period2+1)
tdSql.execute(f'flush database {paraDict["dbName"]}')
else:
if db_wal_retention_period_list[0] != self.wal_retention_period1:
tdSql.execute(f"alter database {paraDict['dbName']} wal_retention_period {self.wal_retention_period1}")
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expected_res = tdSql.queryRows
group_id = "csm_" + str(start_group_id)
consumer_dict = {
"group.id": group_id,
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.commit.interval.ms": paraDict["auto_commit_interval"],
"enable.auto.commit": commit_value,
"auto.offset.reset": offset_value,
"experimental.snapshot.enable": snapshot_value,
"msg.with.table.name": tbname_value
}
consumer_commit = 1 if consumer_dict["enable.auto.commit"] == "true" else 0
consumer_tbname = 1 if consumer_dict["msg.with.table.name"] == "true" else 0
consumer_ret = "earliest" if offset_value == "" else offset_value
expected_parameters=f'tbname:{consumer_tbname},commit:{consumer_commit},interval:{paraDict["auto_commit_interval"]},reset:{consumer_ret}'
if len(offset_value) == 0:
del consumer_dict["auto.offset.reset"]
consumer = Consumer(consumer_dict)
consumer.subscribe([topic_name])
tdLog.info(f"enable.auto.commit: {commit_value}, auto.offset.reset: {offset_value}, experimental.snapshot.enable: {snapshot_value}, msg.with.table.name: {tbname_value}")
stop_flag = 0
try:
while True:
res = consumer.poll(1)
tdSql.query('show consumers;')
consumer_info = tdSql.queryResult[0][-1]
if offset_value == "latest":
if not res and stop_flag == 1:
break
else:
if not res:
break
# err = res.error()
# if err is not None:
# raise err
# val = res.value()
# for block in val:
# print(block.fetchall())
if offset_value == "latest" and stop_flag == 0:
tmqCom.insert_data(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],int(round(time.time()*1000)))
stop_flag = 1
finally:
consumer.unsubscribe()
consumer.close()
tdSql.checkEqual(consumer_info, expected_parameters)
start_group_id += 1
tdSql.query('show subscriptions;')
subscription_info = tdSql.queryResult
if snapshot_value == "true":
if offset_value != "earliest" and offset_value != "":
if offset_value == "latest":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) > 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res)
elif offset_value == "none":
offset_value_list = list(map(lambda x: x[-2], subscription_info))
tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info))
rows_value_list = list(map(lambda x: x[-1], subscription_info))
tdSql.checkEqual(rows_value_list, [0]*len(subscription_info))
else:
if offset_value != "none":
offset_value_str = ",".join(list(map(lambda x: x[-2], subscription_info)))
tdSql.checkEqual("tsdb" in offset_value_str, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res)
else:
offset_value_list = list(map(lambda x: x[-2], subscription_info))
tdSql.checkEqual(offset_value_list, [None]*len(subscription_info))
rows_value_list = list(map(lambda x: x[-1], subscription_info))
tdSql.checkEqual(rows_value_list, [None]*len(subscription_info))
else:
if offset_value != "none":
offset_value_list = list(map(lambda x: int(x[-2].replace("wal:", "").replace("earliest", "0")), subscription_info))
tdSql.checkEqual(sum(offset_value_list) > 0, True)
rows_value_list = list(map(lambda x: int(x[-1]), subscription_info))
tdSql.checkEqual(sum(rows_value_list), expected_res)
else:
offset_value_list = list(map(lambda x: x[-2], subscription_info))
tdSql.checkEqual(offset_value_list, ['none']*len(subscription_info))
rows_value_list = list(map(lambda x: x[-1], subscription_info))
tdSql.checkEqual(rows_value_list, [0]*len(subscription_info))
tdSql.execute(f"drop topic if exists {topic_name}")
tdSql.execute(f'drop database if exists {paraDict["dbName"]}')
def run(self):
self.tmqParamsTest()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
#
# The option for wal_retetion_period and wal_retention_size is work well
#
import taos
from taos.tmq import Consumer
import os
import sys
import threading
import json
import time
from datetime import date
from datetime import datetime
from datetime import timedelta
from os import path
# consume topic
def consume_topic(topic_name, consume_cnt, wait):
print("start consume...")
consumer = Consumer(
{
"group.id": "tg2",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
print("start subscrite...")
consumer.subscribe([topic_name])
cnt = 0
try:
while True and cnt < consume_cnt:
res = consumer.poll(1)
if not res:
if wait:
continue
else:
break
err = res.error()
if err is not None:
raise err
val = res.value()
cnt += 1
print(f" consume {cnt} ")
for block in val:
print(block.fetchall())
finally:
consumer.unsubscribe()
consumer.close()
if __name__ == "__main__":
print(sys.argv)
if len(sys.argv) < 2:
print(" please input topic name for consume . -c for wait")
else:
wait = False
if "-c" == sys.argv[1]:
wait = True
topic = sys.argv[2]
else:
topic = sys.argv[1]
print(f' wait={wait} topic={topic}')
consume_topic(topic, 10000000, wait)
\ No newline at end of file
import time
import os
import subprocess
import random
import platform
class dnode():
def __init__(self, pid, path):
self.pid = pid
self.path = path
# run exePath no wait finished
def runNoWait(exePath):
if platform.system().lower() == 'windows':
cmd = f"mintty -h never {exePath}"
else:
cmd = f"nohup {exePath} > /dev/null 2>&1 & "
if os.system(cmd) != 0:
return False
else:
return True
# get online dnodes
def getDnodes():
cmd = "ps aux | grep taosd | awk '{{print $2,$11,$12,$13}}'"
result = os.system(cmd)
result=subprocess.check_output(cmd,shell=True)
strout = result.decode('utf-8').split("\n")
dnodes = []
for line in strout:
cols = line.split(' ')
if len(cols) != 4:
continue
exepath = cols[1]
if len(exepath) < 5 :
continue
if exepath[-5:] != 'taosd':
continue
# add to list
path = cols[1] + " " + cols[2] + " " + cols[3]
dnodes.append(dnode(cols[0], path))
print(" show dnodes cnt=%d...\n"%(len(dnodes)))
for dn in dnodes:
print(f" pid={dn.pid} path={dn.path}")
return dnodes
def restartDnodes(dnodes, cnt, seconds):
print(f"start dnode cnt={cnt} wait={seconds}s")
selects = random.sample(dnodes, cnt)
for select in selects:
print(f" kill -9 {select.pid}")
cmd = f"kill -9 {select.pid}"
os.system(cmd)
print(f" restart {select.path}")
if runNoWait(select.path) == False:
print(f"run {select.path} failed.")
raise Exception("exe failed.")
print(f" sleep {seconds}s ...")
time.sleep(seconds)
def run():
# kill seconds interval
killLoop = 10
minKill = 1
maxKill = 10
for i in range(killLoop):
dnodes = getDnodes()
killCnt = 0
if len(dnodes) > 0:
killCnt = random.randint(1, len(dnodes))
restartDnodes(dnodes, killCnt, random.randint(1, 5))
seconds = random.randint(minKill, maxKill)
print(f"----------- kill loop i={i} killCnt={killCnt} done. do sleep {seconds}s ... \n")
time.sleep(seconds)
if __name__ == '__main__':
run()
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册