提交 d9c364d6 编写于 作者: H Haojun Liao

Merge branch '3.0' into refact/fillhistory

......@@ -123,8 +123,8 @@ ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3 -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ENDIF ()
# disable all assert
......
......@@ -77,6 +77,12 @@ ELSEIF (TD_DARWIN_64)
ENDIF ()
ENDIF ()
option(
BUILD_GEOS
"If build geos on Windows"
OFF
)
option(
BUILD_SHARED_LIBS
""
......
......@@ -56,9 +56,17 @@ IF (${CMAKE_SYSTEM_NAME} MATCHES "Linux" OR ${CMAKE_SYSTEM_NAME} MATCHES "Darwin
SET(TD_DARWIN TRUE)
SET(OSTYPE "macOS")
execute_process(COMMAND geos-config --cflags OUTPUT_VARIABLE GEOS_CFLAGS)
execute_process(COMMAND geos-config --ldflags OUTPUT_VARIABLE GEOS_LDFLAGS)
string(SUBSTRING ${GEOS_CFLAGS} 2 -1 GEOS_CFLAGS)
string(REGEX REPLACE "\n" "" GEOS_CFLAGS ${GEOS_CFLAGS})
string(SUBSTRING ${GEOS_LDFLAGS} 2 -1 GEOS_LDFLAGS)
string(REGEX REPLACE "\n" "" GEOS_LDFLAGS ${GEOS_LDFLAGS})
MESSAGE("GEOS_CFLAGS "${GEOS_CFLAGS})
MESSAGE("GEOS_LDFLAGS "${GEOS_LDFLAGS})
ADD_DEFINITIONS("-DDARWIN -Wno-tautological-pointer-compare")
INCLUDE_DIRECTORIES(/usr/local/include)
LINK_DIRECTORIES(/usr/local/lib)
INCLUDE_DIRECTORIES(${GEOS_CFLAGS})
LINK_DIRECTORIES(${GEOS_LDFLAGS})
IF (${CMAKE_SYSTEM_PROCESSOR} MATCHES "arm64")
MESSAGE("Current system arch is arm64")
......
......@@ -231,11 +231,16 @@ if(${BUILD_WITH_ROCKSDB})
if(${TD_LINUX})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result")
endif(${TD_LINUX})
MESSAGE(STATUS "CXXXX STATUS CONFIG: " ${CMAKE_CXX_FLAGS})
if(${TD_DARWIN})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
endif(${TD_DARWIN})
if (${TD_DARWIN_ARM64})
set(HAS_ARMV8_CRC true)
endif(${TD_DARWIN_ARM64})
if (${TD_WINDOWS})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4244 /wd4819")
endif(${TD_WINDOWS})
......@@ -248,7 +253,7 @@ if(${BUILD_WITH_ROCKSDB})
endif(${TD_DARWIN})
if(${TD_WINDOWS})
option(WITH_JNI "" ON)
option(WITH_JNI "" OFF)
endif(${TD_WINDOWS})
if(${TD_WINDOWS})
......@@ -260,7 +265,7 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_FALLOCATE "" OFF)
option(WITH_JEMALLOC "" OFF)
option(WITH_GFLAGS "" OFF)
option(PORTABLE "" ON)
option(PORTABLE "" OFF)
option(WITH_LIBURING "" OFF)
option(FAIL_ON_WARNINGS OFF)
......@@ -268,8 +273,11 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_BENCHMARK_TOOLS "" OFF)
option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF)
IF (TD_LINUX)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON)
ELSE()
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
ENDIF()
add_subdirectory(rocksdb EXCLUDE_FROM_ALL)
target_include_directories(
rocksdb
......
此差异已折叠。
......@@ -1008,8 +1008,7 @@ SAMPLE(expr, k)
**More explanations**:
This function cannot be used in expression calculation.
- Must be used with `PARTITION BY tbname` when it's used on a STable to force the result on each single timeline
- This function cannot be used in expression calculation.
### TAIL
......@@ -1088,7 +1087,6 @@ CSUM(expr)
- Arithmetic operation can't be performed on the result of `csum` function
- Can only be used with aggregate functions This function can be used with supertables and standard tables.
- Must be used with `PARTITION BY tbname` when it's used on a STable to force the result on each single timeline
### DERIVATIVE
......@@ -1112,7 +1110,6 @@ ignore_negative: {
**More explanation**:
- It can be used together with `PARTITION BY tbname` against a STable.
- It can be used together with a selected column. For example: select \_rowts, DERIVATIVE() from.
### DIFF
......@@ -1175,7 +1172,6 @@ MAVG(expr, k)
- Arithmetic operation can't be performed on the result of `MAVG`.
- Can only be used with data columns, can't be used with tags. - Can't be used with aggregate functions.
- Must be used with `PARTITION BY tbname` when it's used on a STable to force the result on each single timeline
### STATECOUNT
......@@ -1201,7 +1197,6 @@ STATECOUNT(expr, oper, val)
**More explanations**:
- Must be used together with `PARTITION BY tbname` when it's used on a STable to force the result into each single timeline]
- Can't be used with window operation, like interval/state_window/session_window
......@@ -1229,7 +1224,6 @@ STATEDURATION(expr, oper, val, unit)
**More explanations**:
- Must be used together with `PARTITION BY tbname` when it's used on a STable to force the result into each single timeline]
- Can't be used with window operation, like interval/state_window/session_window
......@@ -1247,7 +1241,6 @@ TWA(expr)
**Applicable table types**: standard tables and supertables
- Must be used together with `PARTITION BY tbname` to force the result into each single timeline.
## System Information Functions
......
......@@ -362,7 +362,7 @@ By using the optional req_id parameter, you can specify a request ID that can be
##### TaosConnection class
The `TaosConnection` class contains both an implementation of the PEP249 Connection interface (e.g., the `cursor()` method and the `close()` method) and many extensions (e.g., the `execute()`, `query()`, `schemaless_insert()`, and `subscribe()` methods).
As the way to connect introduced above but add `req_id` argument.
```python title="execute method"
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:insert}}
......@@ -372,13 +372,9 @@ The `TaosConnection` class contains both an implementation of the PEP249 Connect
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:query}}
```
:::tip
The queried results can only be fetched once. For example, only one of `fetch_all()` and `fetch_all_into_dict()` can be used in the example above. Repeated fetches will result in an empty list.
:::
##### Use of TaosResult class
In the above example of using the `TaosConnection` class, we have shown two ways to get the result of a query: `fetch_all()` and `fetch_all_into_dict()`. In addition, `TaosResult` also provides methods to iterate through the result set by rows (`rows_iter`) or by data blocks (`blocks_iter`). Using these two methods will be more efficient in scenarios where the query has a large amount of data.
As the way to fetch data introduced above but add `req_id` argument.
```python title="blocks_iter method"
{{#include docs/examples/python/result_set_with_req_id_examples.py}}
......@@ -391,17 +387,12 @@ The `TaosConnection` class and the `TaosResult` class already implement all the
{{#include docs/examples/python/cursor_usage_native_reference_with_req_id.py}}
```
:::note
The TaosCursor class uses native connections for write and query operations. In a client-side multi-threaded scenario, this cursor instance must remain thread exclusive and cannot be shared across threads for use, otherwise, it will result in errors in the returned results.
:::
</TabItem>
<TabItem value="rest" label="REST connection">
##### Use of TaosRestCursor class
The `TaosRestCursor` class is an implementation of the PEP249 Cursor interface.
As the way to connect introduced above but add `req_id` argument.
```python title="Use of TaosRestCursor"
{{#include docs/examples/python/connect_rest_with_req_id_examples.py:basic}}
......@@ -421,8 +412,11 @@ The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-ap
For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html).
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
As the way to connect introduced above but add `req_id` argument.
```python
{{#include docs/examples/python/connect_websocket_with_req_id_examples.py:basic}}
```
......
......@@ -111,7 +111,7 @@ The parameters described in this document by the effect that they have on the sy
| Attribute | Description |
| ------------- | ---------------------------------------------- |
| Applicable | Client/Server |
| Meaning | The maximum waiting time to get avaliable conn |
| Meaning | The maximum waiting time to get available conn |
| Value Range | 10-50000000(ms) |
| Default Value | 500000 |
......
此差异已折叠。
......@@ -362,7 +362,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
##### TaosConnection 类的使用
`TaosConnection` 类既包含对 PEP249 Connection 接口的实现(如:`cursor`方法和 `close` 方法),也包含很多扩展功能(如: `execute`、 `query`、`schemaless_insert` 和 `subscribe` 方法
类似上文介绍的使用方法,增加 `req_id` 参数
```python title="execute 方法"
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:insert}}
......@@ -372,13 +372,9 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:query}}
```
:::tip
查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。
:::
##### TaosResult 类的使用
上面 `TaosConnection` 类的使用示例中,我们已经展示了两种获取查询结果的方法: `fetch_all()` 和 `fetch_all_into_dict()`。除此之外 `TaosResult` 还提供了按行迭代(`rows_iter`)或按数据块迭代(`blocks_iter`)结果集的方法。在查询数据量较大的场景,使用这两个方法会更高效
类似上文介绍的使用方法,增加 `req_id` 参数
```python title="blocks_iter 方法"
{{#include docs/examples/python/result_set_with_req_id_examples.py}}
......@@ -391,14 +387,11 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
{{#include docs/examples/python/cursor_usage_native_reference_with_req_id.py}}
```
:::note
TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。
:::
</TabItem>
<TabItem value="rest" label="REST 连接">
类似上文介绍的使用方法,增加 `req_id` 参数。
##### TaosRestCursor 类的使用
`TaosRestCursor` 类是对 PEP249 Cursor 接口的实现。
......@@ -420,8 +413,11 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
类似上文介绍的使用方法,增加 `req_id` 参数。
```python
{{#include docs/examples/python/connect_websocket_with_req_id_examples.py:basic}}
```
......
......@@ -121,6 +121,8 @@ alter_database_option: {
| WAL_LEVEL value
| WAL_FSYNC_PERIOD value
| KEEP value
| WAL_RETENTION_PERIOD value
| WAL_RETENTION_SIZE value
}
```
......
......@@ -1001,7 +1001,6 @@ SAMPLE(expr, k)
**使用说明**
- 不能参与表达式计算;该函数可以应用在普通表和超级表上;
- 使用在超级表上的时候,需要搭配 PARTITION by tbname 使用,将结果强制规约到单个时间线。
### TAIL
......@@ -1080,7 +1079,6 @@ CSUM(expr)
- 不支持 +、-、*、/ 运算,如 csum(col1) + csum(col2)。
- 只能与聚合(Aggregation)函数一起使用。 该函数可以应用在普通表和超级表上。
- 使用在超级表上的时候,需要搭配 PARTITION BY tbname使用,将结果强制规约到单个时间线。
### DERIVATIVE
......@@ -1104,7 +1102,6 @@ ignore_negative: {
**使用说明**:
- DERIVATIVE 函数可以在由 PARTITION BY 划分出单独时间线的情况下用于超级表(也即 PARTITION BY tbname)。
- 可以与选择相关联的列一起使用。 例如: select \_rowts, DERIVATIVE() from。
### DIFF
......@@ -1167,7 +1164,6 @@ MAVG(expr, k)
- 不支持 +、-、*、/ 运算,如 mavg(col1, k1) + mavg(col2, k1);
- 只能与普通列,选择(Selection)、投影(Projection)函数一起使用,不能与聚合(Aggregation)函数一起使用;
- 使用在超级表上的时候,需要搭配 PARTITION BY tbname使用,将结果强制规约到单个时间线。
### STATECOUNT
......@@ -1193,7 +1189,6 @@ STATECOUNT(expr, oper, val)
**使用说明**
- 该函数可以应用在普通表上,在由 PARTITION BY 划分出单独时间线的情况下用于超级表(也即 PARTITION BY tbname)
- 不能和窗口操作一起使用,例如 interval/state_window/session_window。
......@@ -1221,7 +1216,6 @@ STATEDURATION(expr, oper, val, unit)
**使用说明**
- 该函数可以应用在普通表上,在由 PARTITION BY 划分出单独时间线的情况下用于超级表(也即 PARTITION BY tbname)
- 不能和窗口操作一起使用,例如 interval/state_window/session_window。
......@@ -1239,8 +1233,6 @@ TWA(expr)
**适用于**:表和超级表。
**使用说明**: TWA 函数可以在由 PARTITION BY 划分出单独时间线的情况下用于超级表(也即 PARTITION BY tbname)。
## 系统信息函数
......
......@@ -247,10 +247,17 @@ launchctl limit maxfiles
该提示是创建 db 的 vnode 数量不够了,需要的 vnode 不能超过了 dnode 中 vnode 的上限。因为系统默认是一个 dnode 中有 CPU 核数两倍的 vnode,也可以通过配置文件中的参数 supportVnodes 控制。
正常调大 taos.cfg 中 supportVnodes 参数即可。
### 21 【查询】在服务器上的使用 tao-CLI 能查到指定时间段的数据,但在客户端机器上查不到?
### 21 在服务器上的使用 taos-CLI 能查到指定时间段的数据,但在客户端机器上查不到?
这种情况是因为客户端与服务器上设置的时区不一致导致的,调整客户端与服务器的时区一致即可解决。
### 22 【表名】表名确认是存在的,但写入或查询时报表不存在错误,非常奇怪,什么原因?
### 22 表名确认是存在的,但在写入或查询时返回表名不存在,什么原因?
TDengine 中的所有名称,包括数据库名、表名等都是区分大小写的,如果这些名称在程序或 taos-CLI 中没有使用反引号(`)括起来使用,即使你输入的是大写的,引擎也会转化成小写来使用,如果名称前后加上了反引号,引擎就不会再转化成小写,会保持原样来使用。
### 23 在 taos-CLI 中查询,字段内容不能完全显示出来怎么办?
可以使用 \G 参数来竖式显示,如 show databases\G; (为了输入方便,在"\"后加 TAB 键,会自动补全后面的内容)
### 24 使用 taosBenchmark 测试工具写入数据查询很快,为什么我写入的数据查询非常慢?
TDengine 在写入数据时如果有很严重的乱序写入问题,会严重影响查询性能,所以需要在写入前解决乱序的问题。如果业务是从 kafka 消费写入,请合理设计消费者,尽可能的一个子表数据由一个消费者去消费并写入,避免由设计产生的乱序。
### 25 我想统计下前后两条写入记录之间的时间差值是多少?
使用 DIFF 函数,可以查看时间列或数值列前后两条记录的差值,非常方便,详细说明见 SQL手册->函数->DIFF
......@@ -180,6 +180,7 @@ extern int32_t tsRpcRetryInterval;
extern bool tsDisableStream;
extern int64_t tsStreamBufferSize;
extern int64_t tsCheckpointInterval;
extern bool tsFilterScalarMode;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......
......@@ -21,8 +21,8 @@ extern "C" {
#endif
#include "tcommon.h"
#include "tvariant.h"
#include "tsimplehash.h"
#include "tvariant.h"
struct SqlFunctionCtx;
struct SResultRowEntryInfo;
......@@ -77,7 +77,7 @@ enum {
enum {
MAIN_SCAN = 0x0u,
REVERSE_SCAN = 0x1u, // todo remove it
PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
};
typedef struct SPoint1 {
......@@ -130,43 +130,44 @@ typedef struct SSerializeDataHandle {
// incremental state storage
typedef struct STdbState {
void* rocksdb;
void** pHandle;
void* writeOpts;
void* readOpts;
void** cfOpts;
void* dbOpt;
struct SStreamTask* pOwner;
void* param;
void* env;
SListNode* pComparNode;
void* pBackendHandle;
void *rocksdb;
void **pHandle;
void *writeOpts;
void *readOpts;
void **cfOpts;
void *dbOpt;
struct SStreamTask *pOwner;
void *param;
void *env;
SListNode *pComparNode;
void *pBackend;
char idstr[64];
void* compactFactory;
void* db;
void* pStateDb;
void* pFuncStateDb;
void* pFillStateDb; // todo refactor
void* pSessionStateDb;
void* pParNameDb;
void* pParTagDb;
void* txn;
void *compactFactory;
TdThreadRwlock rwLock;
void *db;
void *pStateDb;
void *pFuncStateDb;
void *pFillStateDb; // todo refactor
void *pSessionStateDb;
void *pParNameDb;
void *pParTagDb;
void *txn;
} STdbState;
typedef struct {
STdbState* pTdbState;
struct SStreamFileState* pFileState;
int32_t number;
SSHashObj* parNameMap;
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
STdbState *pTdbState;
struct SStreamFileState *pFileState;
int32_t number;
SSHashObj *parNameMap;
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
} SStreamState;
typedef struct SFunctionStateStore {
int32_t (*streamStateFuncPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t (*streamStateFuncGet)(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);
int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);
int32_t (*streamStateFuncGet)(SStreamState *pState, const SWinKey *key, void **ppVal, int32_t *pVLen);
} SFunctionStateStore;
// sql function runtime context
......@@ -180,7 +181,7 @@ typedef struct SqlFunctionCtx {
int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param
SFunctParam *param;
SFunctParam *param;
// corresponding output buffer for timestamp of each result, e.g., diff/csum
SColumnInfoData *pTsOutput;
int32_t numOfParams;
......
......@@ -34,7 +34,24 @@ extern "C" {
// SListNode* streamBackendAddCompare(void* backend, void* arg);
// void streamBackendDelCompare(void* backend, void* arg);
//typedef struct STdbState {
// <<<<<<< HEAD
// typedef struct STdbState {
// rocksdb_t* rocksdb;
// rocksdb_column_family_handle_t** pHandle;
// rocksdb_writeoptions_t* writeOpts;
// rocksdb_readoptions_t* readOpts;
// rocksdb_options_t** cfOpts;
// rocksdb_options_t* dbOpt;
// struct SStreamTask* pOwner;
// void* param;
// void* env;
// SListNode* pComparNode;
// void* pBackend;
// char idstr[64];
// void* compactFactory;
// TdThreadRwlock rwLock;
// =======
// typedef struct STdbState {
// rocksdb_t* rocksdb;
// rocksdb_column_family_handle_t** pHandle;
// rocksdb_writeoptions_t* writeOpts;
......@@ -58,6 +75,7 @@ extern "C" {
// TTB* pParTagDb;
// TXN* txn;
//} STdbState;
//>>>>>>> enh/dev3.0
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState, bool remove);
......
......@@ -158,6 +158,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
*/
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableName(JNIEnv *, jobject, jlong);
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: tmqGetOffset
* Signature: (J)Ljava/lang/String;
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *, jobject, jlong);
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: fetchBlockImp
......@@ -166,6 +173,12 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *, jobject, jlong, jlong,
jobject, jobject);
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *, jobject, jlong, jstring, jint,
jlong);
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
jstring, jobject);
#ifdef __cplusplus
}
#endif
......
......@@ -17,9 +17,16 @@
#include "jniCommon.h"
#include "taos.h"
int __init_tmq = 0;
int __init_tmq = 0;
jmethodID g_offsetCallback;
jclass g_assignmentClass;
jmethodID g_assignmentConstructor;
jmethodID g_assignmentSetVgId;
jmethodID g_assignmentSetCurrentOffset;
jmethodID g_assignmentSetBegin;
jmethodID g_assignmentSetEnd;
void tmqGlobalMethod(JNIEnv *env) {
// make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) {
......@@ -46,6 +53,38 @@ void tmqGlobalMethod(JNIEnv *env) {
jniDebug("tmq method register finished");
}
int __init_assignment = 0;
void tmqAssignmentMethod(JNIEnv *env) {
// make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init_assignment, 0, 1)) {
case 0:
break;
case 1:
do {
taosMsleep(0);
} while (atomic_load_32(&__init_assignment) == 1);
case 2:
return;
}
if (g_vm == NULL) {
(*env)->GetJavaVM(env, &g_vm);
}
jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment");
g_assignmentClass = (*env)->NewGlobalRef(env, assignment);
g_assignmentConstructor = (*env)->GetMethodID(env, g_assignmentClass, "<init>", "()V");
g_assignmentSetVgId = (*env)->GetMethodID(env, g_assignmentClass, "setVgId", "(I)V"); // int
g_assignmentSetCurrentOffset = (*env)->GetMethodID(env, g_assignmentClass, "setCurrentOffset", "(J)V"); // long
g_assignmentSetBegin = (*env)->GetMethodID(env, g_assignmentClass, "setBegin", "(J)V"); // long
g_assignmentSetEnd = (*env)->GetMethodID(env, g_assignmentClass, "setEnd", "(J)V"); // long
(*env)->DeleteLocalRef(env, assignment);
atomic_store_32(&__init_assignment, 2);
jniDebug("tmq method assignment finished");
}
// deprecated
void commit_cb(tmq_t *tmq, int32_t code, void *param) {
JNIEnv *env = NULL;
......@@ -266,8 +305,9 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
tmq_commit_async(tmq, res, commit_cb, consumer);
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
jlong jres, jobject offset) {
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jlong jres,
jobject offset) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
......@@ -335,7 +375,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
TAOS_RES *res = (TAOS_RES *)jres;
if (res == NULL) {
jniDebug("jobj:%p, invalid res handle", jobj);
return -1;
return JNI_RESULT_SET_NULL;
}
return tmq_get_vgroup_id(res);
}
......@@ -350,6 +390,15 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
return (*env)->NewStringUTF(env, tmq_get_table_name(res));
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *env, jobject jobj, jlong jres) {
TAOS_RES *res = (TAOS_RES *)jres;
if (res == NULL) {
jniDebug("jobj:%p, invalid res handle", jobj);
return JNI_RESULT_SET_NULL;
}
return tmq_get_vgroup_offset(res);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *env, jobject jobj, jlong con,
jlong res, jobject rowobj,
jobject arrayListObj) {
......@@ -369,7 +418,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
jniDebug("jobj:%p, conn:%p, resultset:%p, no data to retrieve", jobj, tscon, (void *)res);
return JNI_FETCH_END;
} else {
jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code, taos_errstr(tres));
jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code,
taos_errstr(tres));
return JNI_RESULT_SET_NULL;
}
}
......@@ -399,3 +449,72 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len));
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint partition,
jlong offset) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
int32_t res = tmq_offset_seek(tmq, topicName, partition, offset);
if (res != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, tmq seek error, code:%d, msg:%s", jobj, res, tmq_err2str(res));
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jint)res;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jobject jarrayList) {
tmqAssignmentMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
tmq_topic_assignment *pAssign = NULL;
int32_t numOfAssignment = 0;
int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);
if (res != TSDB_CODE_SUCCESS) {
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
tmq_err2str(res));
tmq_free_assignment(pAssign);
return (jint)res;
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
for (int i = 0; i < numOfAssignment; ++i) {
tmq_topic_assignment assignment = pAssign[i];
jobject jassignment = (*env)->NewObject(env, g_assignmentClass, g_assignmentConstructor);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetVgId, assignment.vgId);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetCurrentOffset, assignment.currentOffset);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetBegin, assignment.begin);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end);
(*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment);
}
tmq_free_assignment(pAssign);
return JNI_SUCCESS;
}
......@@ -208,6 +208,7 @@ char tsUdfdLdLibPath[512] = "";
bool tsDisableStream = false;
int64_t tsStreamBufferSize = 128 * 1024 * 1024;
int64_t tsCheckpointInterval = 3 * 60 * 60 * 1000;
bool tsFilterScalarMode = false;
#ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) {
......@@ -522,6 +523,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1;
GRANT_CFG_ADD;
return 0;
}
......@@ -898,6 +901,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64;
tsCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i64;
tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval;
GRANT_CFG_GET;
return 0;
}
......
......@@ -986,20 +986,20 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
}
int32_t numOfVnodes = mndGetVnodesNum(pMnode, pDnode->id);
if (numOfVnodes > 0 || pMObj != NULL || pSObj != NULL || pQObj != NULL) {
bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
if (isonline && force) {
terrno = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
if (!isonline && !force) {
terrno = TSDB_CODE_DNODE_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
bool isonline = mndIsDnodeOnline(pDnode, taosGetTimestampMs());
if (isonline && force) {
terrno = TSDB_CODE_DNODE_ONLY_USE_WHEN_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
if (!isonline && !force) {
terrno = TSDB_CODE_DNODE_OFFLINE;
mError("dnode:%d, failed to drop since %s, vnodes:%d mnode:%d qnode:%d snode:%d", pDnode->id, terrstr(),
numOfVnodes, pMObj != NULL, pQObj != NULL, pSObj != NULL);
goto _OVER;
}
code = mndDropDnode(pMnode, pReq, pDnode, pMObj, pQObj, pSObj, numOfVnodes, force, dropReq.unsafe);
......
......@@ -87,6 +87,28 @@ target_include_directories(
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
)
IF (TD_LINUX)
target_link_libraries(
vnode
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC tfs
PUBLIC wal
PUBLIC qworker
PUBLIC sync
PUBLIC executor
PUBLIC scheduler
PUBLIC tdb
# PUBLIC bdb
# PUBLIC scalar
PUBLIC rocksdb-shared
PUBLIC transport
PUBLIC stream
PUBLIC index
)
ELSE()
target_link_libraries(
vnode
PUBLIC os
......@@ -107,6 +129,7 @@ target_link_libraries(
PUBLIC stream
PUBLIC index
)
ENDIF()
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(vnode PUBLIC grant)
......
......@@ -1231,7 +1231,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
streamProcessRunReq(pTask);
} else {
if (streamTaskShouldPause(&pTask->status)) {
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
}
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
}
......
......@@ -128,6 +128,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
rocksdb_options_set_comparator(options, cmp);
rocksdb_block_based_options_set_block_cache(tableoptions, cache);
rocksdb_options_set_block_based_table_factory(options, tableoptions);
rocksdb_options_set_info_log_level(options, 2); // WARN_LEVEL
// rocksdb_options_set_inplace_update_support(options, 1);
// rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
......
......@@ -19,7 +19,7 @@
#include "trpc.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {.statEnable = true};
SCtgDebug gCTGDebug = {0};
#if 0
......
......@@ -573,8 +573,18 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SColumnInfoD
if (colDataIsNull_var(pDst, j)) {
colDataSetNull_var(pDst, numOfRows);
} else {
// fix address sanitizer error. p1 may point to memory that will change during realloc of colDataSetVal, first copy it to p2
char* p1 = colDataGetVarData(pDst, j);
colDataSetVal(pDst, numOfRows, p1, false);
int32_t len = 0;
if (pDst->info.type == TSDB_DATA_TYPE_JSON) {
len = getJsonValueLen(p1);
} else {
len = varDataTLen(p1);
}
char* p2 = taosMemoryMalloc(len);
memcpy(p2, p1, len);
colDataSetVal(pDst, numOfRows, p2, false);
taosMemoryFree(p2);
}
numOfRows += 1;
j += 1;
......
......@@ -600,9 +600,9 @@ SUdf *udfdGetOrCreateUdf(const char *udfName) {
return udf;
} else {
(*pUdfHash)->expired = true;
taosHashRemove(global.udfsHash, udfName, strlen(udfName));
fnInfo("udfd expired, check for new version. existing udf %s udf version %d, udf created time %" PRIx64,
(*pUdfHash)->name, (*pUdfHash)->version, (*pUdfHash)->createdTime);
taosHashRemove(global.udfsHash, udfName, strlen(udfName));
}
}
......
......@@ -227,8 +227,10 @@ typedef struct SFltTreeStat {
SFilterInfo *info;
} SFltTreeStat;
typedef struct SFltScalarCtx {
SNode *node;
SArray* fltSclRange;
} SFltScalarCtx;
typedef struct SFltBuildGroupCtx {
......@@ -237,6 +239,11 @@ typedef struct SFltBuildGroupCtx {
int32_t code;
} SFltBuildGroupCtx;
typedef struct {
SColumnNode *colNode;
SArray *points;
} SFltSclColumnRange;
struct SFilterInfo {
bool scalarMode;
SFltScalarCtx sclCtx;
......
此差异已折叠。
......@@ -1791,7 +1791,11 @@ void vectorNotMatch(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOu
void vectorIsNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord) {
for (int32_t i = 0; i < pLeft->numOfRows; ++i) {
int8_t v = IS_HELPER_NULL(pLeft->columnData, i) ? 1 : 0;
if (v) {
++pOut->numOfQualified;
}
colDataSetInt8(pOut->columnData, i, &v);
colDataClearNull_f(pOut->columnData->nullbitmap, i);
}
pOut->numOfRows = pLeft->numOfRows;
}
......@@ -1799,7 +1803,11 @@ void vectorIsNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut,
void vectorNotNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord) {
for (int32_t i = 0; i < pLeft->numOfRows; ++i) {
int8_t v = IS_HELPER_NULL(pLeft->columnData, i) ? 0 : 1;
if (v) {
++pOut->numOfQualified;
}
colDataSetInt8(pOut->columnData, i, &v);
colDataClearNull_f(pOut->columnData->nullbitmap, i);
}
pOut->numOfRows = pLeft->numOfRows;
}
......@@ -1812,6 +1820,13 @@ void vectorIsTrue(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut,
colDataSetInt8(pOut->columnData, i, &v);
colDataClearNull_f(pOut->columnData->nullbitmap, i);
}
{
bool v = false;
GET_TYPED_DATA(v, bool, pOut->columnData->info.type, colDataGetData(pOut->columnData, i));
if (v) {
++pOut->numOfQualified;
}
}
}
pOut->columnData->hasNull = false;
}
......@@ -1851,7 +1866,9 @@ void vectorJsonContains(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam
char *pLeftData = colDataGetVarData(pLeft->columnData, i);
getJsonValue(pLeftData, jsonKey, &isExist);
}
if (isExist) {
++pOut->numOfQualified;
}
colDataSetVal(pOutputCol, i, (const char *)(&isExist), false);
}
taosMemoryFree(jsonKey);
......
......@@ -6,13 +6,23 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
if(${BUILD_WITH_ROCKSDB})
IF (TD_LINUX)
target_link_libraries(
stream
PUBLIC rocksdb-shared tdb
PRIVATE os util transport qcom executor wal index
)
ELSE()
target_link_libraries(
stream
PUBLIC rocksdb tdb
PRIVATE os util transport qcom executor wal index
)
ENDIF()
target_include_directories(
stream
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
......
......@@ -79,17 +79,16 @@ static FORCE_INLINE int32_t syncLogReplGetNextRetryBackoff(SSyncLogReplMgr* pMgr
SyncTerm syncLogReplGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index);
int32_t syncLogReplRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode);
int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index, SyncTerm* pTerm, SRaftId* pDestId,
bool* pBarrier);
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg);
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg);
......
......@@ -717,7 +717,7 @@ _out:
return ret;
}
int32_t syncLogReplProcessReplyAsRecovery(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
SRaftId destId = pMsg->srcId;
ASSERT(pMgr->restored == false);
......@@ -820,15 +820,15 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp
}
if (pMgr->restored) {
(void)syncLogReplProcessReplyAsNormal(pMgr, pNode, pMsg);
(void)syncLogReplContinue(pMgr, pNode, pMsg);
} else {
(void)syncLogReplProcessReplyAsRecovery(pMgr, pNode, pMsg);
(void)syncLogReplRecover(pMgr, pNode, pMsg);
}
taosThreadMutexUnlock(&pBuf->mutex);
return 0;
}
int32_t syncLogReplDoOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->restored) {
(void)syncLogReplAttempt(pMgr, pNode);
} else {
......@@ -931,7 +931,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
return 0;
}
int32_t syncLogReplProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
int32_t syncLogReplContinue(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
ASSERT(pMgr->restored == true);
if (pMgr->startIndex <= pMsg->lastSendIndex && pMsg->lastSendIndex < pMgr->endIndex) {
if (pMgr->startIndex < pMgr->matchIndex && pMgr->retryBackoff > 0) {
......
......@@ -72,7 +72,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
continue;
}
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
(void)syncLogReplDoOnce(pMgr, pNode);
(void)syncLogReplStart(pMgr, pNode);
}
return 0;
}
......
......@@ -166,7 +166,7 @@ static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
char* t = NULL;
if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
void* payload = GET_PAYLOAD_DATA(pg);
t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
t = doCompressData(payload, pBuf->pageSize + sizeof(SFilePage), &size, pBuf);
if (size < 0) {
uError("failed to compress data when flushing data to disk, %s", pBuf->id);
terrno = TSDB_CODE_INVALID_PARA;
......
......@@ -75,4 +75,12 @@ target_link_libraries(rbtreeTest os util gtest_main)
add_test(
NAME rbtreeTest
COMMAND rbtreeTest
)
\ No newline at end of file
)
# pageBufferTest
add_executable(pageBufferTest "pageBufferTest.cpp")
target_link_libraries(pageBufferTest os util gtest_main)
add_test(
NAME pageBufferTest
COMMAND pageBufferTest
)
......@@ -157,6 +157,68 @@ void recyclePageTest() {
destroyDiskbasedBuf(pBuf);
}
int saveDataToPage(SFilePage* pPg, const char* data, uint32_t len) {
memcpy(pPg->data + pPg->num, data, len);
pPg->num += len;
setBufPageDirty(pPg, true);
return 0;
}
bool checkBufVarData(SFilePage* pPg, const char* varData, uint32_t varLen) {
const char* start = pPg->data + sizeof(SFilePage);
for (uint32_t i = 0; i < (pPg->num - sizeof(SFilePage)) / varLen; ++i) {
if (0 != strncmp(start + 6 * i + 3, varData, varLen - 3)) {
using namespace std;
cout << "pos: " << sizeof(SFilePage) + 6 * i + 3 << " should be " << varData << " but is: " << start + 6 * i + 3
<< endl;
return false;
}
}
return true;
}
// SPageInfo.pData: | sizeof(void*) 8 bytes | sizeof(SFilePage) 4 bytes| 4096 bytes |
// ^
// |
// SFilePage: flush to disk from here
void testFlushAndReadBackBuffer() {
SDiskbasedBuf* pBuf = NULL;
uint32_t totalLen = 4096;
auto code = createDiskbasedBuf(&pBuf, totalLen, totalLen * 2, "1", TD_TMP_DIR_PATH);
int32_t pageId = -1;
auto* pPg = (SFilePage*)getNewBufPage(pBuf, &pageId);
ASSERT_TRUE(pPg != nullptr);
pPg->num = sizeof(SFilePage);
// save data into page
uint32_t len = 6; // sizeof(SFilePage) + 6 * 682 = 4096
// nullbitmap(1) + len(2) + AA\0(3)
char* rowData = (char*)taosMemoryCalloc(1, len);
*(uint16_t*)(rowData + 2) = (uint16_t)2;
rowData[3] = 'A';
rowData[4] = 'A';
while (pPg->num + len <= getBufPageSize(pBuf)) {
saveDataToPage(pPg, rowData, len);
}
ASSERT_EQ(pPg->num, totalLen);
ASSERT_TRUE(checkBufVarData(pPg, rowData + 3, len));
releaseBufPage(pBuf, pPg);
// flush to disk
int32_t newPgId = -1;
pPg = (SFilePage*)getNewBufPage(pBuf, &newPgId);
releaseBufPage(pBuf, pPg);
pPg = (SFilePage*)getNewBufPage(pBuf, &newPgId);
releaseBufPage(pBuf, pPg);
// reload it from disk
pPg = (SFilePage*)getBufPage(pBuf, pageId);
ASSERT_TRUE(checkBufVarData(pPg, rowData + 3, len));
destroyDiskbasedBuf(pBuf);
}
} // namespace
TEST(testCase, resultBufferTest) {
......@@ -164,6 +226,7 @@ TEST(testCase, resultBufferTest) {
simpleTest();
writeDownTest();
recyclePageTest();
testFlushAndReadBackBuffer();
}
#pragma GCC diagnostic pop
\ No newline at end of file
#pragma GCC diagnostic pop
......@@ -1274,6 +1274,7 @@
,,y,script,./test.sh -f tsim/parser/columnValue_tinyint.sim
,,y,script,./test.sh -f tsim/parser/columnValue_unsign.sim
,,y,script,./test.sh -f tsim/parser/condition.sim
,,y,script,./test.sh -f tsim/parser/condition_scl.sim
,,y,script,./test.sh -f tsim/parser/constCol.sim
,,y,script,./test.sh -f tsim/parser/create_db.sim
,,y,script,./test.sh -f tsim/parser/create_mt.sim
......@@ -1353,6 +1354,7 @@
,,y,script,./test.sh -f tsim/query/event.sim
,,y,script,./test.sh -f tsim/query/forceFill.sim
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
,,y,script,./test.sh -f tsim/query/emptyTsRange_scl.sim
,,y,script,./test.sh -f tsim/query/partitionby.sim
,,y,script,./test.sh -f tsim/query/tableCount.sim
,,y,script,./test.sh -f tsim/query/tag_scan.sim
......
......@@ -35,7 +35,7 @@ endi
print =============== step2 drop dnode 3
sql_error drop dnode 1
sql drop dnode 3
sql drop dnode 3 force
sql select * from information_schema.ins_dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
......
......@@ -57,7 +57,7 @@ if $data(2)[7] != @status msg timeout@ then
endi
print ========== step4
sql drop dnode 2
sql drop dnode 2 force
sql select * from information_schema.ins_dnodes
if $rows != 1 then
return -1
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c filterScalarMode -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql drop database if exists cdb
sql create database if not exists cdb
sql use cdb
sql create table stb1 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(10), t3 double)
sql create table tb1 using stb1 tags(1,'1',1.0)
sql create table tb2 using stb1 tags(2,'2',2.0)
sql create table tb3 using stb1 tags(3,'3',3.0)
sql create table tb4 using stb1 tags(4,'4',4.0)
sql create table tb5 using stb1 tags(5,'5',5.0)
sql create table tb6 using stb1 tags(6,'6',6.0)
sql insert into tb1 values ('2021-05-05 18:19:00',1,1.0,1,1,1,1.0,true ,'1','1')
sql insert into tb1 values ('2021-05-05 18:19:01',2,2.0,2,2,2,2.0,true ,'2','2')
sql insert into tb1 values ('2021-05-05 18:19:02',3,3.0,3,3,3,3.0,false,'3','3')
sql insert into tb1 values ('2021-05-05 18:19:03',4,4.0,4,4,4,4.0,false,'4','4')
sql insert into tb1 values ('2021-05-05 18:19:04',11,11.0,11,11,11,11.0,true ,'11','11')
sql insert into tb1 values ('2021-05-05 18:19:05',12,12.0,12,12,12,12.0,true ,'12','12')
sql insert into tb1 values ('2021-05-05 18:19:06',13,13.0,13,13,13,13.0,false,'13','13')
sql insert into tb1 values ('2021-05-05 18:19:07',14,14.0,14,14,14,14.0,false,'14','14')
sql insert into tb2 values ('2021-05-05 18:19:08',21,21.0,21,21,21,21.0,true ,'21','21')
sql insert into tb2 values ('2021-05-05 18:19:09',22,22.0,22,22,22,22.0,true ,'22','22')
sql insert into tb2 values ('2021-05-05 18:19:10',23,23.0,23,23,23,23.0,false,'23','23')
sql insert into tb2 values ('2021-05-05 18:19:11',24,24.0,24,24,24,24.0,false,'24','24')
sql insert into tb3 values ('2021-05-05 18:19:12',31,31.0,31,31,31,31.0,true ,'31','31')
sql insert into tb3 values ('2021-05-05 18:19:13',32,32.0,32,32,32,32.0,true ,'32','32')
sql insert into tb3 values ('2021-05-05 18:19:14',33,33.0,33,33,33,33.0,false,'33','33')
sql insert into tb3 values ('2021-05-05 18:19:15',34,34.0,34,34,34,34.0,false,'34','34')
sql insert into tb4 values ('2021-05-05 18:19:16',41,41.0,41,41,41,41.0,true ,'41','41')
sql insert into tb4 values ('2021-05-05 18:19:17',42,42.0,42,42,42,42.0,true ,'42','42')
sql insert into tb4 values ('2021-05-05 18:19:18',43,43.0,43,43,43,43.0,false,'43','43')
sql insert into tb4 values ('2021-05-05 18:19:19',44,44.0,44,44,44,44.0,false,'44','44')
sql insert into tb5 values ('2021-05-05 18:19:20',51,51.0,51,51,51,51.0,true ,'51','51')
sql insert into tb5 values ('2021-05-05 18:19:21',52,52.0,52,52,52,52.0,true ,'52','52')
sql insert into tb5 values ('2021-05-05 18:19:22',53,53.0,53,53,53,53.0,false,'53','53')
sql insert into tb5 values ('2021-05-05 18:19:23',54,54.0,54,54,54,54.0,false,'54','54')
sql insert into tb6 values ('2021-05-05 18:19:24',61,61.0,61,61,61,61.0,true ,'61','61')
sql insert into tb6 values ('2021-05-05 18:19:25',62,62.0,62,62,62,62.0,true ,'62','62')
sql insert into tb6 values ('2021-05-05 18:19:26',63,63.0,63,63,63,63.0,false,'63','63')
sql insert into tb6 values ('2021-05-05 18:19:27',64,64.0,64,64,64,64.0,false,'64','64')
sql insert into tb6 values ('2021-05-05 18:19:28',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
sql create table stb2 (ts timestamp, u1 int unsigned, u2 bigint unsigned, u3 smallint unsigned, u4 tinyint unsigned, ts2 timestamp) TAGS(t1 int unsigned, t2 bigint unsigned, t3 timestamp, t4 int)
sql create table tb2_1 using stb2 tags(1,1,'2021-05-05 18:38:38',1)
sql create table tb2_2 using stb2 tags(2,2,'2021-05-05 18:58:58',2)
sql insert into tb2_1 values ('2021-05-05 18:19:00',1,2,3,4,'2021-05-05 18:28:01')
sql insert into tb2_1 values ('2021-05-05 18:19:01',5,6,7,8,'2021-05-05 18:28:02')
sql insert into tb2_1 values ('2021-05-05 18:19:02',2,2,3,4,'2021-05-05 18:28:03')
sql insert into tb2_1 values ('2021-05-05 18:19:03',5,6,7,8,'2021-05-05 18:28:04')
sql insert into tb2_1 values ('2021-05-05 18:19:04',3,2,3,4,'2021-05-05 18:28:05')
sql insert into tb2_1 values ('2021-05-05 18:19:05',5,6,7,8,'2021-05-05 18:28:06')
sql insert into tb2_1 values ('2021-05-05 18:19:06',4,2,3,4,'2021-05-05 18:28:07')
sql insert into tb2_1 values ('2021-05-05 18:19:07',5,6,7,8,'2021-05-05 18:28:08')
sql insert into tb2_1 values ('2021-05-05 18:19:08',5,2,3,4,'2021-05-05 18:28:09')
sql insert into tb2_1 values ('2021-05-05 18:19:09',5,6,7,8,'2021-05-05 18:28:10')
sql insert into tb2_1 values ('2021-05-05 18:19:10',6,2,3,4,'2021-05-05 18:28:11')
sql insert into tb2_2 values ('2021-05-05 18:19:11',5,6,7,8,'2021-05-05 18:28:12')
sql insert into tb2_2 values ('2021-05-05 18:19:12',7,2,3,4,'2021-05-05 18:28:13')
sql insert into tb2_2 values ('2021-05-05 18:19:13',5,6,7,8,'2021-05-05 18:28:14')
sql insert into tb2_2 values ('2021-05-05 18:19:14',8,2,3,4,'2021-05-05 18:28:15')
sql insert into tb2_2 values ('2021-05-05 18:19:15',5,6,7,8,'2021-05-05 18:28:16')
sql create table stb3 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(10), t3 double)
sql create table tb3_1 using stb3 tags(1,'1',1.0)
sql create table tb3_2 using stb3 tags(2,'2',2.0)
sql insert into tb3_1 values ('2021-01-05 18:19:00',1,1.0,1,1,1,1.0,true ,'1','1')
sql insert into tb3_1 values ('2021-02-05 18:19:01',2,2.0,2,2,2,2.0,true ,'2','2')
sql insert into tb3_1 values ('2021-03-05 18:19:02',3,3.0,3,3,3,3.0,false,'3','3')
sql insert into tb3_1 values ('2021-04-05 18:19:03',4,4.0,4,4,4,4.0,false,'4','4')
sql insert into tb3_1 values ('2021-05-05 18:19:28',5,NULL,5,NULL,5,NULL,true,NULL,'5')
sql insert into tb3_1 values ('2021-06-05 18:19:28',NULL,6.0,NULL,6,NULL,6.0,NULL,'6',NULL)
sql insert into tb3_1 values ('2021-07-05 18:19:28',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
sql insert into tb3_2 values ('2021-01-06 18:19:00',11,11.0,11,11,11,11.0,true ,'11','11')
sql insert into tb3_2 values ('2021-02-06 18:19:01',12,12.0,12,12,12,12.0,true ,'12','12')
sql insert into tb3_2 values ('2021-03-06 18:19:02',13,13.0,13,13,13,13.0,false,'13','13')
sql insert into tb3_2 values ('2021-04-06 18:19:03',14,14.0,14,14,14,14.0,false,'14','14')
sql insert into tb3_2 values ('2021-05-06 18:19:28',15,NULL,15,NULL,15,NULL,true,NULL,'15')
sql insert into tb3_2 values ('2021-06-06 18:19:28',NULL,16.0,NULL,16,NULL,16.0,NULL,'16',NULL)
sql insert into tb3_2 values ('2021-07-06 18:19:28',NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL,NULL)
sql create table stb4 (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9),c10 binary(16300)) TAGS(t1 int, t2 binary(10), t3 double)
sql create table tb4_0 using stb4 tags(0,'0',0.0)
sql create table tb4_1 using stb4 tags(1,'1',1.0)
sql create table tb4_2 using stb4 tags(2,'2',2.0)
sql create table tb4_3 using stb4 tags(3,'3',3.0)
sql create table tb4_4 using stb4 tags(4,'4',4.0)
$i = 0
$ts0 = 1625850000000
$blockNum = 5
$delta = 0
$tbname0 = tb4_
$a = 0
$b = 200
$c = 400
while $i < $blockNum
$x = 0
$rowNum = 1200
while $x < $rowNum
$ts = $ts0 + $x
$a = $a + 1
$b = $b + 1
$c = $c + 1
$d = $x / 10
$tin = $rowNum
$binary = 'binary . $c
$binary = $binary . '
$nchar = 'nchar . $c
$nchar = $nchar . '
$tbname = 'tb4_ . $i
$tbname = $tbname . '
sql insert into $tbname values ( $ts , $a , $b , $c , $d , $d , $c , true, $binary , $nchar , $binary )
$x = $x + 1
endw
$i = $i + 1
$ts0 = $ts0 + 259200000
endw
run tsim/parser/condition_query.sim
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
run tsim/parser/condition_query.sim
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c filterScalarMode -v 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql drop database if exists db1;
sql create database if not exists db1;
sql use db1;
sql create stable sta (ts timestamp, f1 double, f2 binary(200)) tags(t1 int);
sql create table tba1 using sta tags(1);
sql insert into tba1 values ('2022-04-26 15:15:01', 1.0, "a");
sql insert into tba1 values ('2022-04-26 15:15:02', 2.0, "b");
sql insert into tba1 values ('2022-04-26 15:15:04', 4.0, "b");
sql insert into tba1 values ('2022-04-26 15:15:05', 5.0, "b");
sql select last_row(*) from sta where ts >= 1678901803783 and ts <= 1678901803783 and _c0 <= 1678901803782 interval(10d,8d) fill(linear) order by _wstart desc;
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -20,7 +20,7 @@ sql_error alter user u2 sysinfo 0
print =============== step2 create drop dnode
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql drop dnode 3
sql drop dnode 3 force
sql alter dnode 1 'debugflag 131'
print =============== step3: select * from information_schema.ins_dnodes
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册