diff --git a/cmake/cmake.define b/cmake/cmake.define
index 0d5c21604a71fe0b6f119cf0ba29c66ebee93709..f3caf49da339d0055476df8175a3041ba7ba69e2 100644
--- a/cmake/cmake.define
+++ b/cmake/cmake.define
@@ -123,8 +123,8 @@ ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ELSE ()
- SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
- SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
+ SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
+ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3 -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ENDIF ()
# disable all assert
diff --git a/cmake/cmake.options b/cmake/cmake.options
index 1d31c94dd8635cf47683805543b053561c02ce75..7725d92abc5ff6cd00f24fe4ade815d0274de007 100644
--- a/cmake/cmake.options
+++ b/cmake/cmake.options
@@ -80,7 +80,7 @@ ENDIF ()
option(
BUILD_GEOS
"If build geos on Windows"
- ON
+ OFF
)
option(
diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt
index eb6da1c7f753f58b60b73688bba573290b4712bf..59986a3b3c4eaf9aac9711c13fb39bdd003cf698 100644
--- a/contrib/CMakeLists.txt
+++ b/contrib/CMakeLists.txt
@@ -231,6 +231,7 @@ if(${BUILD_WITH_ROCKSDB})
if(${TD_LINUX})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result")
endif(${TD_LINUX})
+ MESSAGE(STATUS "CXXXX STATUS CONFIG: " ${CMAKE_CXX_FLAGS})
if(${TD_DARWIN})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
@@ -252,7 +253,7 @@ if(${BUILD_WITH_ROCKSDB})
endif(${TD_DARWIN})
if(${TD_WINDOWS})
- option(WITH_JNI "" ON)
+ option(WITH_JNI "" OFF)
endif(${TD_WINDOWS})
if(${TD_WINDOWS})
@@ -264,7 +265,7 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_FALLOCATE "" OFF)
option(WITH_JEMALLOC "" OFF)
option(WITH_GFLAGS "" OFF)
- option(PORTABLE "" ON)
+ option(PORTABLE "" OFF)
option(WITH_LIBURING "" OFF)
option(FAIL_ON_WARNINGS OFF)
@@ -272,8 +273,11 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_BENCHMARK_TOOLS "" OFF)
option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF)
-
+ IF (TD_LINUX)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON)
+ ELSE()
+ option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
+ ENDIF()
add_subdirectory(rocksdb EXCLUDE_FROM_ALL)
target_include_directories(
rocksdb
diff --git a/docs/en/14-reference/03-connector/07-python.mdx b/docs/en/14-reference/03-connector/07-python.mdx
index a83f2047d02f7161d85a2f7ecb1c2a983ecfc22e..b263af8ea6afcebe23726d5baa8dd4246e239963 100644
--- a/docs/en/14-reference/03-connector/07-python.mdx
+++ b/docs/en/14-reference/03-connector/07-python.mdx
@@ -362,7 +362,7 @@ By using the optional req_id parameter, you can specify a request ID that can be
##### TaosConnection class
-The `TaosConnection` class contains both an implementation of the PEP249 Connection interface (e.g., the `cursor()` method and the `close()` method) and many extensions (e.g., the `execute()`, `query()`, `schemaless_insert()`, and `subscribe()` methods).
+As the way to connect introduced above but add `req_id` argument.
```python title="execute method"
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:insert}}
@@ -372,13 +372,9 @@ The `TaosConnection` class contains both an implementation of the PEP249 Connect
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:query}}
```
-:::tip
-The queried results can only be fetched once. For example, only one of `fetch_all()` and `fetch_all_into_dict()` can be used in the example above. Repeated fetches will result in an empty list.
-:::
-
##### Use of TaosResult class
-In the above example of using the `TaosConnection` class, we have shown two ways to get the result of a query: `fetch_all()` and `fetch_all_into_dict()`. In addition, `TaosResult` also provides methods to iterate through the result set by rows (`rows_iter`) or by data blocks (`blocks_iter`). Using these two methods will be more efficient in scenarios where the query has a large amount of data.
+As the way to fetch data introduced above but add `req_id` argument.
```python title="blocks_iter method"
{{#include docs/examples/python/result_set_with_req_id_examples.py}}
@@ -391,17 +387,12 @@ The `TaosConnection` class and the `TaosResult` class already implement all the
{{#include docs/examples/python/cursor_usage_native_reference_with_req_id.py}}
```
-:::note
-The TaosCursor class uses native connections for write and query operations. In a client-side multi-threaded scenario, this cursor instance must remain thread exclusive and cannot be shared across threads for use, otherwise, it will result in errors in the returned results.
-
-:::
-
##### Use of TaosRestCursor class
-The `TaosRestCursor` class is an implementation of the PEP249 Cursor interface.
+As the way to connect introduced above but add `req_id` argument.
```python title="Use of TaosRestCursor"
{{#include docs/examples/python/connect_rest_with_req_id_examples.py:basic}}
@@ -421,8 +412,11 @@ The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-ap
For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html).
+
+As the way to connect introduced above but add `req_id` argument.
+
```python
{{#include docs/examples/python/connect_websocket_with_req_id_examples.py:basic}}
```
diff --git a/docs/zh/08-connector/30-python.mdx b/docs/zh/08-connector/30-python.mdx
index 1cff142e11d39e6afe86fab187697d222f37a9dd..1037d66f17e619e9b01688447320f981f3679604 100644
--- a/docs/zh/08-connector/30-python.mdx
+++ b/docs/zh/08-connector/30-python.mdx
@@ -362,7 +362,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
##### TaosConnection 类的使用
-`TaosConnection` 类既包含对 PEP249 Connection 接口的实现(如:`cursor`方法和 `close` 方法),也包含很多扩展功能(如: `execute`、 `query`、`schemaless_insert` 和 `subscribe` 方法。
+类似上文介绍的使用方法,增加 `req_id` 参数。
```python title="execute 方法"
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:insert}}
@@ -372,13 +372,9 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:query}}
```
-:::tip
-查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。
-:::
-
##### TaosResult 类的使用
-上面 `TaosConnection` 类的使用示例中,我们已经展示了两种获取查询结果的方法: `fetch_all()` 和 `fetch_all_into_dict()`。除此之外 `TaosResult` 还提供了按行迭代(`rows_iter`)或按数据块迭代(`blocks_iter`)结果集的方法。在查询数据量较大的场景,使用这两个方法会更高效。
+类似上文介绍的使用方法,增加 `req_id` 参数。
```python title="blocks_iter 方法"
{{#include docs/examples/python/result_set_with_req_id_examples.py}}
@@ -391,14 +387,11 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
{{#include docs/examples/python/cursor_usage_native_reference_with_req_id.py}}
```
-:::note
-TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。
-
-:::
-
+类似上文介绍的使用方法,增加 `req_id` 参数。
+
##### TaosRestCursor 类的使用
`TaosRestCursor` 类是对 PEP249 Cursor 接口的实现。
@@ -420,8 +413,11 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。
+
+类似上文介绍的使用方法,增加 `req_id` 参数。
+
```python
{{#include docs/examples/python/connect_websocket_with_req_id_examples.py:basic}}
```
diff --git a/docs/zh/27-train-faq/01-faq.md b/docs/zh/27-train-faq/01-faq.md
index bf46f3ca1f7ebc6dda17e3cc30fe4d8b1a17867e..15397049dd79d5888242dd4ed17d8395f1d1096e 100644
--- a/docs/zh/27-train-faq/01-faq.md
+++ b/docs/zh/27-train-faq/01-faq.md
@@ -247,10 +247,17 @@ launchctl limit maxfiles
该提示是创建 db 的 vnode 数量不够了,需要的 vnode 不能超过了 dnode 中 vnode 的上限。因为系统默认是一个 dnode 中有 CPU 核数两倍的 vnode,也可以通过配置文件中的参数 supportVnodes 控制。
正常调大 taos.cfg 中 supportVnodes 参数即可。
-### 21 【查询】在服务器上的使用 tao-CLI 能查到指定时间段的数据,但在客户端机器上查不到?
+### 21 在服务器上的使用 taos-CLI 能查到指定时间段的数据,但在客户端机器上查不到?
这种情况是因为客户端与服务器上设置的时区不一致导致的,调整客户端与服务器的时区一致即可解决。
-### 22 【表名】表名确认是存在的,但写入或查询时报表不存在错误,非常奇怪,什么原因?
+### 22 表名确认是存在的,但在写入或查询时返回表名不存在,什么原因?
TDengine 中的所有名称,包括数据库名、表名等都是区分大小写的,如果这些名称在程序或 taos-CLI 中没有使用反引号(`)括起来使用,即使你输入的是大写的,引擎也会转化成小写来使用,如果名称前后加上了反引号,引擎就不会再转化成小写,会保持原样来使用。
+### 23 在 taos-CLI 中查询,字段内容不能完全显示出来怎么办?
+可以使用 \G 参数来竖式显示,如 show databases\G; (为了输入方便,在"\"后加 TAB 键,会自动补全后面的内容)
+### 24 使用 taosBenchmark 测试工具写入数据查询很快,为什么我写入的数据查询非常慢?
+TDengine 在写入数据时如果有很严重的乱序写入问题,会严重影响查询性能,所以需要在写入前解决乱序的问题。如果业务是从 kafka 消费写入,请合理设计消费者,尽可能的一个子表数据由一个消费者去消费并写入,避免由设计产生的乱序。
+
+### 25 我想统计下前后两条写入记录之间的时间差值是多少?
+使用 DIFF 函数,可以查看时间列或数值列前后两条记录的差值,非常方便,详细说明见 SQL手册->函数->DIFF
diff --git a/include/libs/function/function.h b/include/libs/function/function.h
index de2494ab3cae508c7a8804f01b334bac7be10587..e015f4182eb159508ba1be141df0c6807db2fcef 100644
--- a/include/libs/function/function.h
+++ b/include/libs/function/function.h
@@ -21,8 +21,8 @@ extern "C" {
#endif
#include "tcommon.h"
-#include "tvariant.h"
#include "tsimplehash.h"
+#include "tvariant.h"
struct SqlFunctionCtx;
struct SResultRowEntryInfo;
@@ -77,7 +77,7 @@ enum {
enum {
MAIN_SCAN = 0x0u,
REVERSE_SCAN = 0x1u, // todo remove it
- PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
+ PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
};
typedef struct SPoint1 {
@@ -130,43 +130,44 @@ typedef struct SSerializeDataHandle {
// incremental state storage
typedef struct STdbState {
- void* rocksdb;
- void** pHandle;
- void* writeOpts;
- void* readOpts;
- void** cfOpts;
- void* dbOpt;
- struct SStreamTask* pOwner;
- void* param;
- void* env;
- SListNode* pComparNode;
- void* pBackendHandle;
+ void *rocksdb;
+ void **pHandle;
+ void *writeOpts;
+ void *readOpts;
+ void **cfOpts;
+ void *dbOpt;
+ struct SStreamTask *pOwner;
+ void *param;
+ void *env;
+ SListNode *pComparNode;
+ void *pBackend;
char idstr[64];
- void* compactFactory;
-
- void* db;
- void* pStateDb;
- void* pFuncStateDb;
- void* pFillStateDb; // todo refactor
- void* pSessionStateDb;
- void* pParNameDb;
- void* pParTagDb;
- void* txn;
+ void *compactFactory;
+ TdThreadRwlock rwLock;
+
+ void *db;
+ void *pStateDb;
+ void *pFuncStateDb;
+ void *pFillStateDb; // todo refactor
+ void *pSessionStateDb;
+ void *pParNameDb;
+ void *pParTagDb;
+ void *txn;
} STdbState;
typedef struct {
- STdbState* pTdbState;
- struct SStreamFileState* pFileState;
- int32_t number;
- SSHashObj* parNameMap;
- int64_t checkPointId;
- int32_t taskId;
- int64_t streamId;
+ STdbState *pTdbState;
+ struct SStreamFileState *pFileState;
+ int32_t number;
+ SSHashObj *parNameMap;
+ int64_t checkPointId;
+ int32_t taskId;
+ int64_t streamId;
} SStreamState;
typedef struct SFunctionStateStore {
- int32_t (*streamStateFuncPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
- int32_t (*streamStateFuncGet)(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);
+ int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);
+ int32_t (*streamStateFuncGet)(SStreamState *pState, const SWinKey *key, void **ppVal, int32_t *pVLen);
} SFunctionStateStore;
// sql function runtime context
@@ -180,7 +181,7 @@ typedef struct SqlFunctionCtx {
int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param
- SFunctParam *param;
+ SFunctParam *param;
// corresponding output buffer for timestamp of each result, e.g., diff/csum
SColumnInfoData *pTsOutput;
int32_t numOfParams;
diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h
index ca85622d8a2a43732acd6beeb5b96e44ba1ed323..7f9d20a9dd878892e512b170921bdb1794defc52 100644
--- a/include/libs/stream/streamState.h
+++ b/include/libs/stream/streamState.h
@@ -34,7 +34,24 @@ extern "C" {
// SListNode* streamBackendAddCompare(void* backend, void* arg);
// void streamBackendDelCompare(void* backend, void* arg);
-//typedef struct STdbState {
+// <<<<<<< HEAD
+// typedef struct STdbState {
+// rocksdb_t* rocksdb;
+// rocksdb_column_family_handle_t** pHandle;
+// rocksdb_writeoptions_t* writeOpts;
+// rocksdb_readoptions_t* readOpts;
+// rocksdb_options_t** cfOpts;
+// rocksdb_options_t* dbOpt;
+// struct SStreamTask* pOwner;
+// void* param;
+// void* env;
+// SListNode* pComparNode;
+// void* pBackend;
+// char idstr[64];
+// void* compactFactory;
+// TdThreadRwlock rwLock;
+// =======
+// typedef struct STdbState {
// rocksdb_t* rocksdb;
// rocksdb_column_family_handle_t** pHandle;
// rocksdb_writeoptions_t* writeOpts;
@@ -58,6 +75,7 @@ extern "C" {
// TTB* pParTagDb;
// TXN* txn;
//} STdbState;
+//>>>>>>> enh/dev3.0
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState, bool remove);
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index 8e7dd0bb0d0dca1f8465cc07eb5f7d9695fed267..1d4bbf073ec57efa69574d5781bf86b1e115b174 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -78,11 +78,11 @@ enum {
TASK_TRIGGER_STATUS__ACTIVE,
};
-enum {
+typedef enum {
TASK_LEVEL__SOURCE = 1,
TASK_LEVEL__AGG,
TASK_LEVEL__SINK,
-};
+} ETASK_LEVEL;
enum {
TASK_OUTPUT__FIXED_DISPATCH = 1,
@@ -284,13 +284,13 @@ struct SStreamTask {
int16_t dispatchMsgType;
SStreamStatus status;
int32_t selfChildId;
- int32_t nodeId;
+ int32_t nodeId; // vgroup id
SEpSet epSet;
SCheckpointInfo chkInfo;
STaskExec exec;
-
- // fill history
- int8_t fillHistory;
+ int8_t fillHistory; // fill history
+ int64_t ekey; // end ts key
+ int64_t endVer; // end version
// children info
SArray* childEpInfo; // SArray
@@ -351,7 +351,7 @@ typedef struct SStreamMeta {
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
-SStreamTask* tNewStreamTask(int64_t streamId);
+SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);
diff --git a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h
index c035b6598c8b6290997e65a30f40cd28a11279b1..422bcd57ac1f3a5fa2cfe47eeb1b2c88427b2c20 100644
--- a/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h
+++ b/source/client/jni/com_taosdata_jdbc_tmq_TMQConnector.h
@@ -158,6 +158,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
*/
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableName(JNIEnv *, jobject, jlong);
+/*
+ * Class: com_taosdata_jdbc_tmq_TMQConnector
+ * Method: tmqGetOffset
+ * Signature: (J)Ljava/lang/String;
+ */
+JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *, jobject, jlong);
+
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: fetchBlockImp
@@ -166,6 +173,12 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *, jobject, jlong, jlong,
jobject, jobject);
+JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *, jobject, jlong, jstring, jint,
+ jlong);
+
+JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
+ jstring, jobject);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/client/src/clientTmqConnector.c b/source/client/src/clientTmqConnector.c
index 894c51d13c9dd8e4205763893206de7ef86cdf7c..6ec82aa6ef0b5697e098d573935cec6e23e2f08c 100644
--- a/source/client/src/clientTmqConnector.c
+++ b/source/client/src/clientTmqConnector.c
@@ -17,9 +17,16 @@
#include "jniCommon.h"
#include "taos.h"
-int __init_tmq = 0;
+int __init_tmq = 0;
jmethodID g_offsetCallback;
+jclass g_assignmentClass;
+jmethodID g_assignmentConstructor;
+jmethodID g_assignmentSetVgId;
+jmethodID g_assignmentSetCurrentOffset;
+jmethodID g_assignmentSetBegin;
+jmethodID g_assignmentSetEnd;
+
void tmqGlobalMethod(JNIEnv *env) {
// make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) {
@@ -46,6 +53,38 @@ void tmqGlobalMethod(JNIEnv *env) {
jniDebug("tmq method register finished");
}
+int __init_assignment = 0;
+void tmqAssignmentMethod(JNIEnv *env) {
+ // make sure init function executed once
+ switch (atomic_val_compare_exchange_32(&__init_assignment, 0, 1)) {
+ case 0:
+ break;
+ case 1:
+ do {
+ taosMsleep(0);
+ } while (atomic_load_32(&__init_assignment) == 1);
+ case 2:
+ return;
+ }
+
+ if (g_vm == NULL) {
+ (*env)->GetJavaVM(env, &g_vm);
+ }
+
+ jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment");
+ g_assignmentClass = (*env)->NewGlobalRef(env, assignment);
+ g_assignmentConstructor = (*env)->GetMethodID(env, g_assignmentClass, "", "()V");
+ g_assignmentSetVgId = (*env)->GetMethodID(env, g_assignmentClass, "setVgId", "(I)V"); // int
+ g_assignmentSetCurrentOffset = (*env)->GetMethodID(env, g_assignmentClass, "setCurrentOffset", "(J)V"); // long
+ g_assignmentSetBegin = (*env)->GetMethodID(env, g_assignmentClass, "setBegin", "(J)V"); // long
+ g_assignmentSetEnd = (*env)->GetMethodID(env, g_assignmentClass, "setEnd", "(J)V"); // long
+
+ (*env)->DeleteLocalRef(env, assignment);
+
+ atomic_store_32(&__init_assignment, 2);
+ jniDebug("tmq method assignment finished");
+}
+
// deprecated
void commit_cb(tmq_t *tmq, int32_t code, void *param) {
JNIEnv *env = NULL;
@@ -266,8 +305,9 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
tmq_commit_async(tmq, res, commit_cb, consumer);
}
-JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
- jlong jres, jobject offset) {
+JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj,
+ jlong jtmq, jlong jres,
+ jobject offset) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
@@ -335,7 +375,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
TAOS_RES *res = (TAOS_RES *)jres;
if (res == NULL) {
jniDebug("jobj:%p, invalid res handle", jobj);
- return -1;
+ return JNI_RESULT_SET_NULL;
}
return tmq_get_vgroup_id(res);
}
@@ -350,6 +390,15 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
return (*env)->NewStringUTF(env, tmq_get_table_name(res));
}
+JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *env, jobject jobj, jlong jres) {
+ TAOS_RES *res = (TAOS_RES *)jres;
+ if (res == NULL) {
+ jniDebug("jobj:%p, invalid res handle", jobj);
+ return JNI_RESULT_SET_NULL;
+ }
+ return tmq_get_vgroup_offset(res);
+}
+
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *env, jobject jobj, jlong con,
jlong res, jobject rowobj,
jobject arrayListObj) {
@@ -369,7 +418,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
jniDebug("jobj:%p, conn:%p, resultset:%p, no data to retrieve", jobj, tscon, (void *)res);
return JNI_FETCH_END;
} else {
- jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code, taos_errstr(tres));
+ jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code,
+ taos_errstr(tres));
return JNI_RESULT_SET_NULL;
}
}
@@ -399,3 +449,72 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len));
return JNI_SUCCESS;
}
+
+JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *env, jobject jobj, jlong jtmq,
+ jstring jtopic, jint partition,
+ jlong offset) {
+ tmq_t *tmq = (tmq_t *)jtmq;
+ if (tmq == NULL) {
+ jniDebug("jobj:%p, tmq is closed", jobj);
+ return TMQ_CONSUMER_NULL;
+ }
+
+ if (jtopic == NULL) {
+ jniDebug("jobj:%p, topic is null", jobj);
+ return TMQ_TOPIC_NULL;
+ }
+ const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
+
+ int32_t res = tmq_offset_seek(tmq, topicName, partition, offset);
+
+ if (res != TSDB_CODE_SUCCESS) {
+ jniError("jobj:%p, tmq seek error, code:%d, msg:%s", jobj, res, tmq_err2str(res));
+ }
+
+ (*env)->ReleaseStringUTFChars(env, jtopic, topicName);
+ return (jint)res;
+}
+
+JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *env, jobject jobj,
+ jlong jtmq, jstring jtopic,
+ jobject jarrayList) {
+ tmqAssignmentMethod(env);
+ tmq_t *tmq = (tmq_t *)jtmq;
+ if (tmq == NULL) {
+ jniDebug("jobj:%p, tmq is closed", jobj);
+ return TMQ_CONSUMER_NULL;
+ }
+
+ if (jtopic == NULL) {
+ jniDebug("jobj:%p, topic is null", jobj);
+ return TMQ_TOPIC_NULL;
+ }
+
+ const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
+
+ tmq_topic_assignment *pAssign = NULL;
+ int32_t numOfAssignment = 0;
+ int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);
+
+ if (res != TSDB_CODE_SUCCESS) {
+ (*env)->ReleaseStringUTFChars(env, jtopic, topicName);
+ jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
+ tmq_err2str(res));
+ tmq_free_assignment(pAssign);
+ return (jint)res;
+ }
+
+ (*env)->ReleaseStringUTFChars(env, jtopic, topicName);
+
+ for (int i = 0; i < numOfAssignment; ++i) {
+ tmq_topic_assignment assignment = pAssign[i];
+ jobject jassignment = (*env)->NewObject(env, g_assignmentClass, g_assignmentConstructor);
+ (*env)->CallVoidMethod(env, jassignment, g_assignmentSetVgId, assignment.vgId);
+ (*env)->CallVoidMethod(env, jassignment, g_assignmentSetCurrentOffset, assignment.currentOffset);
+ (*env)->CallVoidMethod(env, jassignment, g_assignmentSetBegin, assignment.begin);
+ (*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end);
+ (*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment);
+ }
+ tmq_free_assignment(pAssign);
+ return JNI_SUCCESS;
+}
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index 7d707f4cbaa4021223e8e8ed59342453b8b87584..82b714e6eb6e3a29cc3ea16d1fb4d0c1bd9d6a6d 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -663,9 +663,10 @@ typedef struct {
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
- int32_t fixedSinkVgId; // 0 for shuffle
+
// fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg;
+ int32_t fixedSinkVgId; // 0 for shuffle
// transformation
char* sql;
diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c
index 734f624be0e19c942c10244f28263570d6ea4504..df8c11a6f60169390daefbb0cb3f6ca17739a9cd 100644
--- a/source/dnode/mnode/impl/src/mndScheduler.c
+++ b/source/dnode/mnode/impl/src/mndScheduler.c
@@ -14,18 +14,8 @@
*/
#include "mndScheduler.h"
-#include "mndConsumer.h"
#include "mndDb.h"
-#include "mndDnode.h"
-#include "mndMnode.h"
-#include "mndShow.h"
#include "mndSnode.h"
-#include "mndStb.h"
-#include "mndStream.h"
-#include "mndSubscribe.h"
-#include "mndTopic.h"
-#include "mndTrans.h"
-#include "mndUser.h"
#include "mndVgroup.h"
#include "parser.h"
#include "tcompare.h"
@@ -34,12 +24,8 @@
extern bool tsDeployOnSnode;
-static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
- int32_t childId = taosArrayGetSize(pArray);
- pTask->selfChildId = childId;
- taosArrayPush(pArray, &pTask);
- return 0;
-}
+static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup);
+static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark) {
@@ -97,7 +83,7 @@ END:
return terrno;
}
-int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
+int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
if (pStream->smaId != 0) {
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
@@ -106,16 +92,23 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
+ if (pTask->tbSink.pSchemaWrapper == NULL) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
}
+
return 0;
}
-int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
+#define SINK_NODE_LEVEL (0)
+
+int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
bool isShuffle = false;
if (pStream->fixedSinkVgId == 0) {
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
+
isShuffle = true;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
@@ -127,47 +120,46 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
sdbRelease(pMnode->pSdb, pDb);
}
+ SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
+ int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
+
if (isShuffle) {
memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
- int32_t sz = taosArrayGetSize(pVgs);
- SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
- int32_t sinkLvSize = taosArrayGetSize(sinkLv);
- for (int32_t i = 0; i < sz; i++) {
+
+ int32_t numOfVgroups = taosArrayGetSize(pVgs);
+ for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
- for (int32_t j = 0; j < sinkLvSize; j++) {
- SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
- if (pLastLevelTask->nodeId == pVgInfo->vgId) {
- pVgInfo->taskId = pLastLevelTask->id.taskId;
+
+ for (int32_t j = 0; j < numOfSinkNodes; j++) {
+ SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
+ if (pSinkTask->nodeId == pVgInfo->vgId) {
+ pVgInfo->taskId = pSinkTask->id.taskId;
break;
}
}
}
} else {
- pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
- pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
- SArray* pArray = taosArrayGetP(pStream->tasks, 0);
- // one sink only
- SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
- pTask->fixedEpDispatcher.taskId = lastLevelTask->id.taskId;
- pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
- pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
+ SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
+ setFixedDownstreamEpInfo(pTask, pOneSinkTask);
}
+
return 0;
}
-int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
+int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen;
+
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
- plan->execNode.nodeId = pVgroup->vgId;
+ plan->execNode.nodeId = pTask->nodeId;
plan->execNode.epSet = pTask->epSet;
-
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
+
return 0;
}
@@ -210,100 +202,121 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup;
}
+// create sink node for each vgroup.
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
- SArray* tasks = taosArrayGetP(pStream->tasks, 0);
while (1) {
SVgObj* pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
- if (pIter == NULL) break;
- if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
- sdbRelease(pSdb, pVgroup);
- continue;
+ if (pIter == NULL) {
+ break;
}
- SStreamTask* pTask = tNewStreamTask(pStream->uid);
- if (pTask == NULL) {
+ if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
sdbRelease(pSdb, pVgroup);
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- return -1;
+ continue;
}
- pTask->fillHistory = pStream->fillHistory;
- mndAddTaskToTaskSet(tasks, pTask);
-
- pTask->nodeId = pVgroup->vgId;
- pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
-
- // type
- pTask->taskLevel = TASK_LEVEL__SINK;
- // sink
- if (pStream->smaId != 0) {
- pTask->outputType = TASK_OUTPUT__SMA;
- pTask->smaSink.smaId = pStream->smaId;
- } else {
- pTask->outputType = TASK_OUTPUT__TABLE;
- pTask->tbSink.stbUid = pStream->targetStbUid;
- memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
- pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
- if (pTask->tbSink.pSchemaWrapper == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- return -1;
- }
- }
+ mndAddSinkTaskToStream(pStream, pMnode, pVgroup->vgId, pVgroup);
sdbRelease(pSdb, pVgroup);
}
+
return 0;
}
-int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
- SArray* tasks = taosArrayGetP(pStream->tasks, 0);
- SStreamTask* pTask = tNewStreamTask(pStream->uid);
+int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) {
+ SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
+
+ SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory, 0, pTaskList);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
- pTask->fillHistory = pStream->fillHistory;
- mndAddTaskToTaskSet(tasks, pTask);
- pTask->nodeId = pStream->fixedSinkVgId;
-#if 0
- SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
- if (pVgroup == NULL) {
- return -1;
- }
+ pTask->nodeId = vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
-#endif
- pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
+ mndSetSinkTaskInfo(pStream, pTask);
+ return 0;
+}
- pTask->taskLevel = TASK_LEVEL__SINK;
+static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStream) {
+ return 0;
+}
- // sink
- if (pStream->smaId != 0) {
- pTask->outputType = TASK_OUTPUT__SMA;
- pTask->smaSink.smaId = pStream->smaId;
+static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream,
+ SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory,
+ bool hasExtraSink) {
+ SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->triggerParam, pTaskList);
+ if (pTask == NULL) {
+ return terrno;
+ }
+
+ // sink or dispatch
+ if (hasExtraSink) {
+ mndAddDispatcherForInnerTask(pMnode, pStream, pTask);
} else {
- pTask->outputType = TASK_OUTPUT__TABLE;
- pTask->tbSink.stbUid = pStream->targetStbUid;
- memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
- pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
+ mndSetSinkTaskInfo(pStream, pTask);
}
- return 0;
+ if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
+ return terrno;
+ }
+
+ return TSDB_CODE_SUCCESS;
+}
+
+static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) {
+ SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
+ if (pEpInfo == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+ }
+
+ pEpInfo->childId = pTask->selfChildId;
+ pEpInfo->epSet = pTask->epSet;
+ pEpInfo->nodeId = pTask->nodeId;
+ pEpInfo->taskId = pTask->id.taskId;
+
+ return pEpInfo;
+}
+
+void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
+ STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher;
+ pDispatcher->taskId = pTask->id.taskId;
+ pDispatcher->nodeId = pTask->nodeId;
+ pDispatcher->epSet = pTask->epSet;
+
+ pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
+ pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
+}
+
+int32_t appendToUpstream(SStreamTask* pTask, SStreamTask* pUpstream) {
+ SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
+ if (pEpInfo == NULL) {
+ return TSDB_CODE_OUT_OF_MEMORY;
+ }
+
+ if(pUpstream->childEpInfo == NULL) {
+ pUpstream->childEpInfo = taosArrayInit(4, POINTER_BYTES);
+ }
+
+ taosArrayPush(pUpstream->childEpInfo, &pEpInfo);
+ return TSDB_CODE_SUCCESS;
}
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
- SSdb* pSdb = pMnode->pSdb;
+ SSdb* pSdb = pMnode->pSdb;
+
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
- int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
- pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*));
+ int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
+ pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES);
bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
@@ -313,13 +326,13 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
- bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
+ bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
sdbRelease(pSdb, pDbObj);
if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
- /*if (true) {*/
- SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
+ SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskOneLevel);
+
// add extra sink
hasExtraSink = true;
if (pStream->fixedSinkVgId == 0) {
@@ -328,19 +341,20 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
} else {
- if (mndAddFixedSinkTaskToStream(pMnode, pStream) < 0) {
+ if (mndAddSinkTaskToStream(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) {
// TODO free
return -1;
}
}
}
+
pStream->totalLevel = planTotLevel + hasExtraSink;
if (planTotLevel > 1) {
SStreamTask* pInnerTask;
// inner level
{
- SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*));
+ SArray* taskInnerLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskInnerLevel);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
@@ -350,25 +364,15 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
- pInnerTask = tNewStreamTask(pStream->uid);
+ pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory, pStream->triggerParam, taskInnerLevel);
if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qDestroyQueryPlan(pPlan);
return -1;
}
- pInnerTask->fillHistory = pStream->fillHistory;
- mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
-
- pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
-
- pInnerTask->taskLevel = TASK_LEVEL__AGG;
-
- // trigger
- pInnerTask->triggerParam = pStream->triggerParam;
-
// dispatch
- if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) {
+ if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) {
qDestroyQueryPlan(pPlan);
return -1;
}
@@ -377,7 +381,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
if (pSnode == NULL) {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
- if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) {
+ if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
@@ -392,17 +396,18 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
} else {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
- if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) {
+ if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
+
sdbRelease(pSdb, pVgroup);
}
}
// source level
- SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*));
+ SArray* taskSourceLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskSourceLevel);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
@@ -416,66 +421,52 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
- if (pIter == NULL) break;
+ if (pIter == NULL) {
+ break;
+ }
+
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
- SStreamTask* pTask = tNewStreamTask(pStream->uid);
+ SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, 0, taskSourceLevel);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
- pTask->fillHistory = pStream->fillHistory;
- mndAddTaskToTaskSet(taskSourceLevel, pTask);
- pTask->triggerParam = 0;
-
- // source
- pTask->taskLevel = TASK_LEVEL__SOURCE;
-
- // add fixed vg dispatch
- pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
- pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
+ // all the source tasks dispatch result to a single agg node.
+ setFixedDownstreamEpInfo(pTask, pInnerTask);
- pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId;
- pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
- pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
-
- if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
+ if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
- SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
- if (pEpInfo == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- sdbRelease(pSdb, pVgroup);
+ int32_t code = appendToUpstream(pTask, pInnerTask);
+ sdbRelease(pSdb, pVgroup);
+
+ if (code != TSDB_CODE_SUCCESS) {
+ terrno = code;
qDestroyQueryPlan(pPlan);
return -1;
}
- pEpInfo->childId = pTask->selfChildId;
- pEpInfo->epSet = pTask->epSet;
- pEpInfo->nodeId = pTask->nodeId;
- pEpInfo->taskId = pTask->id.taskId;
- taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
- sdbRelease(pSdb, pVgroup);
}
- }
-
- if (planTotLevel == 1) {
- SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
- taosArrayPush(pStream->tasks, &taskOneLevel);
+ } else if (planTotLevel == 1) {
+ // create exec stream task, since only one level, the exec task is also the source task
+ SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
+ taosArrayPush(pStream->tasks, &pTaskList);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
if (LIST_LENGTH(inner->pNodeList) != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
+
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
@@ -486,42 +477,26 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
- if (pIter == NULL) break;
- if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
- sdbRelease(pSdb, pVgroup);
- continue;
+ if (pIter == NULL) {
+ break;
}
- SStreamTask* pTask = tNewStreamTask(pStream->uid);
- if (pTask == NULL) {
+ if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup);
- qDestroyQueryPlan(pPlan);
- return -1;
+ continue;
}
- pTask->fillHistory = pStream->fillHistory;
- mndAddTaskToTaskSet(taskOneLevel, pTask);
-
- // source
- pTask->taskLevel = TASK_LEVEL__SOURCE;
-
- // trigger
- pTask->triggerParam = pStream->triggerParam;
- // sink or dispatch
- if (hasExtraSink) {
- mndAddDispatcherToInnerTask(pMnode, pStream, pTask);
- } else {
- mndAddSinkToTask(pMnode, pStream, pTask);
- }
+ // new stream task
+ int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pStream, plan, pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, hasExtraSink);
+ sdbRelease(pSdb, pVgroup);
- if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
- sdbRelease(pSdb, pVgroup);
+ if (code != TSDB_CODE_SUCCESS) {
qDestroyQueryPlan(pPlan);
return -1;
}
- sdbRelease(pSdb, pVgroup);
}
}
+
qDestroyQueryPlan(pPlan);
return 0;
}
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index 0713150b486d953ccb42eb6acd5e907251d268d6..39a1fa223f13c1ca7137172b7f55ee9e09f82817 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -700,6 +700,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (pStream->sourceDbUid == streamObj.sourceDbUid) {
++numOfStream;
}
+
sdbRelease(pMnode->pSdb, pStream);
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
@@ -723,6 +724,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
pDb = NULL;
goto _OVER;
}
+
mndReleaseDb(pMnode, pDb);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt
index bf50b03ae4b2e4d27d25306cc17a60996421e9eb..c238cb38bcd8874ca0bf2e2ee04291107e0adb57 100644
--- a/source/dnode/vnode/CMakeLists.txt
+++ b/source/dnode/vnode/CMakeLists.txt
@@ -99,6 +99,7 @@ target_include_directories(
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
)
+IF (TD_LINUX)
target_link_libraries(
vnode
PUBLIC os
@@ -119,6 +120,28 @@ target_link_libraries(
PUBLIC stream
PUBLIC index
)
+ELSE()
+target_link_libraries(
+ vnode
+ PUBLIC os
+ PUBLIC util
+ PUBLIC common
+ PUBLIC tfs
+ PUBLIC wal
+ PUBLIC qworker
+ PUBLIC sync
+ PUBLIC executor
+ PUBLIC scheduler
+ PUBLIC tdb
+
+ # PUBLIC bdb
+ # PUBLIC scalar
+ PUBLIC rocksdb
+ PUBLIC transport
+ PUBLIC stream
+ PUBLIC index
+)
+ENDIF()
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(vnode PUBLIC grant)
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index 77facb0d4272f8e65fc8938cc8e878e06bb7b61c..dcfc578ac73d617a260353fa49d231630903f5e2 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -1231,7 +1231,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
streamProcessRunReq(pTask);
} else {
if (streamTaskShouldPause(&pTask->status)) {
- atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE);
+ atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
}
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
}
diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c
index ef77c9a7c4a919199e01e20e6fd28c59ab42c63c..6a97ea89b3ac1391cb92fe691e195dd7ad3f0cc7 100644
--- a/source/dnode/vnode/src/tsdb/tsdbCache.c
+++ b/source/dnode/vnode/src/tsdb/tsdbCache.c
@@ -128,6 +128,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
rocksdb_options_set_comparator(options, cmp);
rocksdb_block_based_options_set_block_cache(tableoptions, cache);
rocksdb_options_set_block_based_table_factory(options, tableoptions);
+ rocksdb_options_set_info_log_level(options, 2); // WARN_LEVEL
// rocksdb_options_set_inplace_update_support(options, 1);
// rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c
index b90e51f1bc2813ff6f4b965d75671871e71e7e78..7cba6ddf0da240ce17e2794487c0d70181cecf27 100644
--- a/source/libs/catalog/src/ctgDbg.c
+++ b/source/libs/catalog/src/ctgDbg.c
@@ -19,7 +19,7 @@
#include "trpc.h"
extern SCatalogMgmt gCtgMgmt;
-SCtgDebug gCTGDebug = {.statEnable = true};
+SCtgDebug gCTGDebug = {0};
#if 0
diff --git a/source/libs/stream/CMakeLists.txt b/source/libs/stream/CMakeLists.txt
index 72517a88daae885718cdbe8377c838ca7399b59f..fa6c709c8ffce6122fc3508eb2844042973eb5e5 100644
--- a/source/libs/stream/CMakeLists.txt
+++ b/source/libs/stream/CMakeLists.txt
@@ -6,12 +6,22 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
+
if(${BUILD_WITH_ROCKSDB})
+ IF (TD_LINUX)
target_link_libraries(
stream
PUBLIC rocksdb-shared tdb
PRIVATE os util transport qcom executor wal index
)
+ ELSE()
+ target_link_libraries(
+ stream
+ PUBLIC rocksdb tdb
+ PRIVATE os util transport qcom executor wal index
+ )
+
+ ENDIF()
target_include_directories(
stream
diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c
index 0015fea10f89f0d3a1aa6850d696e24191b6c362..f7638a42ae7d79addd638f68b8601bdbc9161579 100644
--- a/source/libs/stream/src/streamBackendRocksdb.c
+++ b/source/libs/stream/src/streamBackendRocksdb.c
@@ -22,6 +22,9 @@ typedef struct SCompactFilteFactory {
void* status;
} SCompactFilteFactory;
+typedef struct {
+ void* tableOpt;
+} RocksdbCfParam;
typedef struct {
rocksdb_t* db;
rocksdb_column_family_handle_t** pHandle;
@@ -29,12 +32,13 @@ typedef struct {
rocksdb_readoptions_t* rOpt;
rocksdb_options_t** cfOpt;
rocksdb_options_t* dbOpt;
- void* param;
- void* pBackendHandle;
+ RocksdbCfParam* param;
+ void* pBackend;
SListNode* pCompareNode;
+ rocksdb_comparator_t** pCompares;
} RocksdbCfInst;
-int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids);
+int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf);
void destroyRocksdbCfInst(RocksdbCfInst* inst);
@@ -46,9 +50,6 @@ unsigned char compactFilte(void* arg, int level, const char* key, size_t klen, c
char** newval, size_t* newvlen, unsigned char* value_changed);
rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_compactionfiltercontext_t* ctx);
-typedef struct {
- void* tableOpt;
-} RocksdbCfParam;
const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"};
typedef int (*EncodeFunc)(void* key, char* buf);
@@ -80,16 +81,16 @@ void* streamBackendInit(const char* path) {
rocksdb_env_set_low_priority_background_threads(env, 4);
rocksdb_env_set_high_priority_background_threads(env, 2);
- rocksdb_cache_t* cache = rocksdb_cache_create_lru(128 << 20);
+ rocksdb_cache_t* cache = rocksdb_cache_create_lru(64 << 20);
rocksdb_options_t* opts = rocksdb_options_create();
rocksdb_options_set_env(opts, env);
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_missing_column_families(opts, 1);
- rocksdb_options_set_write_buffer_size(opts, 128 << 20);
+ rocksdb_options_set_write_buffer_size(opts, 48 << 20);
rocksdb_options_set_max_total_wal_size(opts, 128 << 20);
rocksdb_options_set_recycle_log_file_num(opts, 6);
- rocksdb_options_set_max_write_buffer_number(opts, 3);
+ rocksdb_options_set_max_write_buffer_number(opts, 2);
rocksdb_options_set_info_log_level(opts, 0);
pHandle->env = env;
@@ -114,25 +115,7 @@ void* streamBackendInit(const char* path) {
/*
list all cf and get prefix
*/
- int64_t streamId;
- int32_t taskId, dummpy = 0;
- SHashObj* tbl = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
- for (size_t i = 0; i < nCf; i++) {
- char* cf = cfs[i];
- char suffix[64] = {0};
- if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, suffix)) {
- char idstr[128] = {0};
- sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId);
- // qError("make cf name %s", idstr);
- if (taosHashGet(tbl, idstr, strlen(idstr) + 1) == NULL) {
- taosHashPut(tbl, idstr, strlen(idstr) + 1, &dummpy, sizeof(dummpy));
- }
- } else {
- continue;
- }
- }
- streamStateOpenBackendCf(pHandle, (char*)path, tbl);
- taosHashCleanup(tbl);
+ streamStateOpenBackendCf(pHandle, (char*)path, cfs, nCf);
}
rocksdb_list_column_families_destroy(cfs, nCf);
@@ -159,16 +142,17 @@ void streamBackendCleanup(void* arg) {
}
taosHashCleanup(pHandle->cfInst);
- rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
- char* err = NULL;
- rocksdb_flush(pHandle->db, flushOpt, &err);
- if (err != NULL) {
- qError("failed to flush db before streamBackend clean up, reason:%s", err);
- taosMemoryFree(err);
+ if (pHandle->db) {
+ char* err = NULL;
+ rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
+ rocksdb_flush(pHandle->db, flushOpt, &err);
+ if (err != NULL) {
+ qError("failed to flush db before streamBackend clean up, reason:%s", err);
+ taosMemoryFree(err);
+ }
+ rocksdb_flushoptions_destroy(flushOpt);
+ rocksdb_close(pHandle->db);
}
- rocksdb_flushoptions_destroy(flushOpt);
-
- rocksdb_close(pHandle->db);
rocksdb_options_destroy(pHandle->dbOpt);
rocksdb_env_destroy(pHandle->env);
rocksdb_cache_destroy(pHandle->cache);
@@ -209,12 +193,13 @@ void streamBackendDelCompare(void* backend, void* arg) {
}
void streamStateDestroy_rocksdb(SStreamState* pState, bool remove) { streamStateCloseBackend(pState, remove); }
static bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len);
-int streamGetInit(const char* funcName);
+int streamGetInit(SStreamState* pState, const char* funcName);
// |key|-----value------|
// |key|ttl|len|userData|
-static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, void** snapshot, void** readOpt);
+static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName,
+ rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
int ret = memcmp(aBuf, bBuf, aLen);
@@ -666,7 +651,7 @@ rocksdb_compactionfilter_t* compactFilteFactoryCreateFilter(void* arg, rocksdb_c
void destroyRocksdbCfInst(RocksdbCfInst* inst) {
int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
for (int i = 0; i < cfLen; i++) {
- rocksdb_column_family_handle_destroy(inst->pHandle[i]);
+ if (inst->pHandle[i]) rocksdb_column_family_handle_destroy((inst->pHandle)[i]);
}
rocksdb_writeoptions_destroy(inst->wOpt);
@@ -674,118 +659,130 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst) {
rocksdb_readoptions_destroy(inst->rOpt);
taosMemoryFree(inst->cfOpt);
- taosMemoryFree(inst->param);
taosMemoryFreeClear(inst->param);
taosMemoryFree(inst);
}
-int32_t streamStateOpenBackendCf(void* backend, char* name, SHashObj* ids) {
+int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t nCf) {
SBackendHandle* handle = backend;
char* err = NULL;
- size_t nSize = taosHashGetSize(ids);
- int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
-
- char** cfNames = taosMemoryCalloc(nSize * cfLen + 1, sizeof(char*));
- void* pIter = taosHashIterate(ids, NULL);
- size_t keyLen = 0;
- char* idstr = taosHashGetKey(pIter, &keyLen);
- for (int i = 0; i < nSize * cfLen + 1; i++) {
- cfNames[i] = (char*)taosMemoryCalloc(1, 128);
- if (i == 0) {
- memcpy(cfNames[0], "default", strlen("default"));
- continue;
- }
-
- GEN_COLUMN_FAMILY_NAME(cfNames[i], idstr, ginitDict[(i - 1) % (cfLen)].key);
- if (i % cfLen == 0) {
- pIter = taosHashIterate(ids, pIter);
- if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen);
- }
- }
- rocksdb_options_t** cfOpts = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_options_t*));
- RocksdbCfParam* params = taosMemoryCalloc(nSize * cfLen + 1, sizeof(RocksdbCfParam*));
- for (int i = 0; i < nSize * cfLen + 1; i++) {
+ int64_t streamId;
+ int32_t taskId, dummy = 0;
+ char suffix[64] = {0};
+
+ rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
+ RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*));
+ rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t**));
+ rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
+
+ for (int i = 0; i < nCf; i++) {
+ char* cf = cfs[i];
+ char funcname[64] = {0};
cfOpts[i] = rocksdb_options_create_copy(handle->dbOpt);
- if (i == 0) {
- continue;
- }
- // refactor later
- rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
- rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
+ if (i == 0) continue;
+ if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
+ rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
+ rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
- rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
- rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
+ rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
+ rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
- rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt);
- params[i].tableOpt = tableOpt;
- };
+ rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpts[i], tableOpt);
+ params[i].tableOpt = tableOpt;
- rocksdb_comparator_t** pCompare = taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_comparator_t**));
- for (int i = 0; i < nSize * cfLen + 1; i++) {
- if (i == 0) {
- continue;
- }
- SCfInit* cf = &ginitDict[(i - 1) % cfLen];
+ int idx = streamGetInit(NULL, funcname);
+ SCfInit* cfPara = &ginitDict[idx];
- rocksdb_comparator_t* compare = rocksdb_comparator_create(NULL, cf->detroyFunc, cf->cmpFunc, cf->cmpName);
- rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare);
- pCompare[i] = compare;
+ rocksdb_comparator_t* compare =
+ rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName);
+ rocksdb_options_set_comparator((rocksdb_options_t*)cfOpts[i], compare);
+ pCompare[i] = compare;
+ }
}
- rocksdb_column_family_handle_t** cfHandle =
- taosMemoryCalloc(nSize * cfLen + 1, sizeof(rocksdb_column_family_handle_t*));
- rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nSize * cfLen + 1, (const char* const*)cfNames,
+ rocksdb_t* db = rocksdb_open_column_families(handle->dbOpt, name, nCf, (const char* const*)cfs,
(const rocksdb_options_t* const*)cfOpts, cfHandle, &err);
if (err != NULL) {
qError("failed to open rocksdb cf, reason:%s", err);
taosMemoryFree(err);
} else {
- qDebug("succ to open rocksdb cf, reason:%s", err);
- }
-
- pIter = taosHashIterate(ids, NULL);
- idstr = taosHashGetKey(pIter, &keyLen);
- for (int i = 0; i < nSize; i++) {
- RocksdbCfInst* inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
- rocksdb_column_family_handle_t** subCf = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
- rocksdb_comparator_t** subCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
- RocksdbCfParam* subParam = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
- rocksdb_options_t** subOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
- for (int j = 0; j < cfLen; j++) {
- subCf[j] = cfHandle[i * cfLen + j + 1];
- subCompare[j] = pCompare[i * cfLen + j + 1];
- subParam[j] = params[i * cfLen + j + 1];
- subOpt[j] = cfOpts[i * cfLen + j + 1];
- }
- inst->db = db;
- inst->pHandle = subCf;
- inst->wOpt = rocksdb_writeoptions_create();
- inst->rOpt = rocksdb_readoptions_create();
- inst->cfOpt = (rocksdb_options_t**)subOpt;
- inst->dbOpt = handle->dbOpt;
- inst->param = subParam;
- inst->pBackendHandle = handle;
- handle->db = db;
- SCfComparator compare = {.comp = subCompare, .numOfComp = cfLen};
- inst->pCompareNode = streamBackendAddCompare(handle, &compare);
- rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
-
- taosHashPut(handle->cfInst, idstr, keyLen, &inst, sizeof(void*));
-
- pIter = taosHashIterate(ids, pIter);
- if (pIter != NULL) idstr = taosHashGetKey(pIter, &keyLen);
+ qDebug("succ to open rocksdb cf");
}
+ // close default cf
rocksdb_column_family_handle_destroy(cfHandle[0]);
rocksdb_options_destroy(cfOpts[0]);
+ handle->db = db;
+
+ static int32_t cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]);
+ for (int i = 0; i < nCf; i++) {
+ char* cf = cfs[i];
+ if (i == 0) continue;
+ char funcname[64] = {0};
+ if (3 == sscanf(cf, "0x%" PRIx64 "-%d_%s", &streamId, &taskId, funcname)) {
+ char idstr[128] = {0};
+ sprintf(idstr, "0x%" PRIx64 "-%d", streamId, taskId);
+
+ int idx = streamGetInit(NULL, funcname);
+
+ RocksdbCfInst* inst = NULL;
+ RocksdbCfInst** pInst = taosHashGet(handle->cfInst, idstr, strlen(idstr) + 1);
+ if (pInst == NULL || *pInst == NULL) {
+ inst = taosMemoryCalloc(1, sizeof(RocksdbCfInst));
+ inst->pHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
+ inst->cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*));
+ inst->wOpt = rocksdb_writeoptions_create();
+ inst->rOpt = rocksdb_readoptions_create();
+ inst->param = taosMemoryCalloc(cfLen, sizeof(RocksdbCfParam));
+ inst->pBackend = handle;
+ inst->db = db;
+ inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
+
+ inst->dbOpt = handle->dbOpt;
+ rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
+ taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*));
+ } else {
+ inst = *pInst;
+ }
+ inst->cfOpt[idx] = cfOpts[i];
+ inst->pCompares[idx] = pCompare[i];
+ memcpy(&(inst->param[idx]), &(params[i]), sizeof(RocksdbCfParam));
+ inst->pHandle[idx] = cfHandle[i];
+ }
+ }
+ void** pIter = taosHashIterate(handle->cfInst, NULL);
+ while (pIter) {
+ RocksdbCfInst* inst = *pIter;
+
+ for (int i = 0; i < cfLen; i++) {
+ if (inst->cfOpt[i] == NULL) {
+ rocksdb_options_t* opt = rocksdb_options_create_copy(handle->dbOpt);
+ rocksdb_block_based_table_options_t* tableOpt = rocksdb_block_based_options_create();
+ rocksdb_block_based_options_set_block_cache(tableOpt, handle->cache);
- for (int i = 0; i < nSize * cfLen + 1; i++) {
- taosMemoryFree(cfNames[i]);
+ rocksdb_filterpolicy_t* filter = rocksdb_filterpolicy_create_bloom(15);
+ rocksdb_block_based_options_set_filter_policy(tableOpt, filter);
+
+ rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)opt, tableOpt);
+
+ SCfInit* cfPara = &ginitDict[i];
+
+ rocksdb_comparator_t* compare =
+ rocksdb_comparator_create(NULL, cfPara->detroyFunc, cfPara->cmpFunc, cfPara->cmpName);
+ rocksdb_options_set_comparator((rocksdb_options_t*)opt, compare);
+
+ inst->pCompares[i] = compare;
+ inst->cfOpt[i] = opt;
+ inst->param[i].tableOpt = tableOpt;
+ }
+ }
+ SCfComparator compare = {.comp = inst->pCompares, .numOfComp = cfLen};
+ inst->pCompareNode = streamBackendAddCompare(handle, &compare);
+ pIter = taosHashIterate(handle->cfInst, pIter);
}
- taosMemoryFree(cfNames);
+
taosMemoryFree(cfHandle);
taosMemoryFree(pCompare);
taosMemoryFree(params);
taosMemoryFree(cfOpts);
-
return 0;
}
int streamStateOpenBackend(void* backend, SStreamState* pState) {
@@ -801,15 +798,14 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
pState->pTdbState->pHandle = (void**)inst->pHandle;
pState->pTdbState->writeOpts = inst->wOpt;
pState->pTdbState->readOpts = inst->rOpt;
- pState->pTdbState->cfOpts = (void**)inst->cfOpt;
+ pState->pTdbState->cfOpts = (void**)(inst->cfOpt);
pState->pTdbState->dbOpt = handle->dbOpt;
pState->pTdbState->param = inst->param;
- pState->pTdbState->pBackendHandle = handle;
+ pState->pTdbState->pBackend = handle;
pState->pTdbState->pComparNode = inst->pCompareNode;
taosThreadMutexUnlock(&handle->cfMutex);
return 0;
}
-
taosThreadMutexUnlock(&handle->cfMutex);
char* err = NULL;
@@ -839,25 +835,17 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
rocksdb_options_set_comparator((rocksdb_options_t*)cfOpt[i], compare);
pCompare[i] = compare;
}
- rocksdb_column_family_handle_t** cfHandle = taosMemoryMalloc(cfLen * sizeof(rocksdb_column_family_handle_t*));
- for (int i = 0; i < cfLen; i++) {
- char buf[128] = {0};
- GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[i].key);
- cfHandle[i] = rocksdb_create_column_family(handle->db, cfOpt[i], buf, &err);
- if (err != NULL) {
- qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
- taosMemoryFreeClear(err);
- }
- }
+ rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(cfLen, sizeof(rocksdb_column_family_handle_t*));
pState->pTdbState->rocksdb = handle->db;
pState->pTdbState->pHandle = (void**)cfHandle;
pState->pTdbState->writeOpts = rocksdb_writeoptions_create();
pState->pTdbState->readOpts = rocksdb_readoptions_create();
- pState->pTdbState->cfOpts = (void**)(rocksdb_options_t**)cfOpt;
+ pState->pTdbState->cfOpts = (void**)cfOpt;
pState->pTdbState->dbOpt = handle->dbOpt;
pState->pTdbState->param = param;
- pState->pTdbState->pBackendHandle = handle;
+ pState->pTdbState->pBackend = handle;
+ taosThreadRwlockInit(&pState->pTdbState->rwLock, NULL);
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pState->pTdbState->pComparNode = streamBackendAddCompare(handle, &compare);
// rocksdb_writeoptions_disable_WAL(pState->pTdbState->writeOpts, 1);
@@ -866,7 +854,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
}
void streamStateCloseBackend(SStreamState* pState, bool remove) {
- SBackendHandle* pHandle = pState->pTdbState->pBackendHandle;
+ SBackendHandle* pHandle = pState->pTdbState->pBackend;
taosThreadMutexLock(&pHandle->cfMutex);
RocksdbCfInst** ppInst = taosHashGet(pHandle->cfInst, pState->pTdbState->idstr, strlen(pState->pTdbState->idstr) + 1);
if (ppInst != NULL && *ppInst != NULL) {
@@ -888,7 +876,9 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
char* err = NULL;
if (remove) {
for (int i = 0; i < cfLen; i++) {
- rocksdb_drop_column_family(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[i], &err);
+ if (pState->pTdbState->pHandle[i] != NULL)
+ rocksdb_drop_column_family(pState->pTdbState->rocksdb,
+ ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[i], &err);
if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
@@ -897,7 +887,8 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
} else {
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
for (int i = 0; i < cfLen; i++) {
- rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err);
+ if (pState->pTdbState->pHandle[i] != NULL)
+ rocksdb_flush_cf(pState->pTdbState->rocksdb, flushOpt, pState->pTdbState->pHandle[i], &err);
if (err != NULL) {
qError("failed to create cf:%s_%s, reason:%s", pState->pTdbState->idstr, ginitDict[i].key, err);
taosMemoryFreeClear(err);
@@ -907,7 +898,9 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
}
for (int i = 0; i < cfLen; i++) {
- rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
+ if (pState->pTdbState->pHandle[i] != NULL) {
+ rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]);
+ }
}
taosMemoryFreeClear(pState->pTdbState->pHandle);
for (int i = 0; i < cfLen; i++) {
@@ -916,7 +909,7 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
}
if (remove) {
- streamBackendDelCompare(pState->pTdbState->pBackendHandle, pState->pTdbState->pComparNode);
+ streamBackendDelCompare(pState->pTdbState->pBackend, pState->pTdbState->pComparNode);
}
rocksdb_writeoptions_destroy(pState->pTdbState->writeOpts);
pState->pTdbState->writeOpts = NULL;
@@ -925,24 +918,52 @@ void streamStateCloseBackend(SStreamState* pState, bool remove) {
pState->pTdbState->readOpts = NULL;
taosMemoryFreeClear(pState->pTdbState->cfOpts);
taosMemoryFreeClear(pState->pTdbState->param);
+
+ taosThreadRwlockDestroy(&pState->pTdbState->rwLock);
pState->pTdbState->rocksdb = NULL;
}
void streamStateDestroyCompar(void* arg) {
SCfComparator* comp = (SCfComparator*)arg;
for (int i = 0; i < comp->numOfComp; i++) {
- rocksdb_comparator_destroy(comp->comp[i]);
+ if (comp->comp[i]) rocksdb_comparator_destroy(comp->comp[i]);
}
taosMemoryFree(comp->comp);
}
-int streamGetInit(const char* funcName) {
+int streamGetInit(SStreamState* pState, const char* funcName) {
+ int idx = -1;
size_t len = strlen(funcName);
for (int i = 0; i < sizeof(ginitDict) / sizeof(ginitDict[0]); i++) {
if (len == ginitDict[i].len && strncmp(funcName, ginitDict[i].key, strlen(funcName)) == 0) {
- return i;
+ idx = i;
+ break;
}
}
- return -1;
+ if (pState != NULL && idx != -1) {
+ rocksdb_column_family_handle_t* cf = NULL;
+ taosThreadRwlockRdlock(&pState->pTdbState->rwLock);
+ cf = pState->pTdbState->pHandle[idx];
+ taosThreadRwlockUnlock(&pState->pTdbState->rwLock);
+ if (cf == NULL) {
+ char buf[128] = {0};
+ GEN_COLUMN_FAMILY_NAME(buf, pState->pTdbState->idstr, ginitDict[idx].key);
+ char* err = NULL;
+
+ taosThreadRwlockWrlock(&pState->pTdbState->rwLock);
+ cf = rocksdb_create_column_family(pState->pTdbState->rocksdb, pState->pTdbState->cfOpts[idx], buf, &err);
+ if (err != NULL) {
+ idx = -1;
+ qError("failed to to open cf, %p 0x%" PRIx64 "-%d_%s, reason:%s", pState, pState->streamId, pState->taskId,
+ funcName, err);
+ taosMemoryFree(err);
+ } else {
+ pState->pTdbState->pHandle[idx] = cf;
+ }
+ taosThreadRwlockUnlock(&pState->pTdbState->rwLock);
+ }
+ }
+
+ return idx;
}
bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len) {
rocksdb_iter_seek(iter, buf, len);
@@ -954,8 +975,9 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
}
return true;
}
-rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, void** snapshot, void** readOpt) {
- int idx = streamGetInit(cfName);
+rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot,
+ rocksdb_readoptions_t** readOpt) {
+ int idx = streamGetInit(pState, cfName);
if (snapshot != NULL) {
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
@@ -966,7 +988,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
rocksdb_readoptions_set_fill_cache(rOpt, 0);
- return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt, pState->pTdbState->pHandle[idx]);
+ return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt,
+ ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]);
}
#define STREAM_STATE_PUT_ROCKSDB(pState, funcname, key, value, vLen) \
@@ -974,7 +997,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
- int i = streamGetInit(funcname); \
+ int i = streamGetInit(pState, funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
@@ -983,11 +1006,12 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
- rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
- rocksdb_t* db = pState->pTdbState->rocksdb; \
- rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
- char* ttlV = NULL; \
- int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
+ rocksdb_column_family_handle_t* pHandle = \
+ ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
+ rocksdb_t* db = pState->pTdbState->rocksdb; \
+ rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
+ char* ttlV = NULL; \
+ int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
if (err != NULL) { \
taosMemoryFree(err); \
@@ -1004,7 +1028,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
- int i = streamGetInit(funcname); \
+ int i = streamGetInit(pState, funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s", funcname); \
code = -1; \
@@ -1013,11 +1037,12 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
- rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
- rocksdb_t* db = pState->pTdbState->rocksdb; \
- rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
- size_t len = 0; \
- char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
+ rocksdb_column_family_handle_t* pHandle = \
+ ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
+ rocksdb_t* db = pState->pTdbState->rocksdb; \
+ rocksdb_readoptions_t* opts = pState->pTdbState->readOpts; \
+ size_t len = 0; \
+ char* val = rocksdb_get_cf(db, opts, pHandle, (const char*)buf, klen, (size_t*)&len, &err); \
if (val == NULL) { \
if (err == NULL) { \
qDebug("streamState str: %s failed to read from %s_%s, err: not exist", toString, pState->pTdbState->idstr, \
@@ -1051,7 +1076,7 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
code = 0; \
char buf[128] = {0}; \
char* err = NULL; \
- int i = streamGetInit(funcname); \
+ int i = streamGetInit(pState, funcname); \
if (i < 0) { \
qWarn("streamState failed to get cf name: %s_%s", pState->pTdbState->idstr, funcname); \
code = -1; \
@@ -1060,9 +1085,10 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
char toString[128] = {0}; \
if (qDebugFlag & DEBUG_TRACE) ginitDict[i].toStrFunc((void*)key, toString); \
int32_t klen = ginitDict[i].enFunc((void*)key, buf); \
- rocksdb_column_family_handle_t* pHandle = pState->pTdbState->pHandle[ginitDict[i].idx]; \
- rocksdb_t* db = pState->pTdbState->rocksdb; \
- rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
+ rocksdb_column_family_handle_t* pHandle = \
+ ((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[ginitDict[i].idx]; \
+ rocksdb_t* db = pState->pTdbState->rocksdb; \
+ rocksdb_writeoptions_t* opts = pState->pTdbState->writeOpts; \
rocksdb_delete_cf(db, opts, pHandle, (const char*)buf, klen, &err); \
if (err != NULL) { \
qError("streamState str: %s failed to del from %s_%s, err: %s", toString, pState->pTdbState->idstr, funcname, \
@@ -1113,8 +1139,10 @@ int32_t streamStateClear_rocksdb(SStreamState* pState) {
}
char* err = NULL;
- rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1],
- sKeyStr, sLen, eKeyStr, eLen, &err);
+ if (pState->pTdbState->pHandle[1] != NULL) {
+ rocksdb_delete_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->writeOpts, pState->pTdbState->pHandle[1],
+ sKeyStr, sLen, eKeyStr, eLen, &err);
+ }
// rocksdb_compact_range_cf(pState->pTdbState->rocksdb, pState->pTdbState->pHandle[0], sKeyStr, sLen, eKeyStr,
// eLen);
if (err != NULL) {
@@ -1214,7 +1242,8 @@ SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWin
}
pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
@@ -1254,7 +1283,8 @@ SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState, const SWinK
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) return NULL;
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
rocksdb_iter_seek(pCur->iter, buf, (size_t)klen);
rocksdb_iter_prev(pCur->iter);
@@ -1276,7 +1306,8 @@ SStreamStateCur* streamStateGetCur_rocksdb(SStreamState* pState, const SWinKey*
if (pCur == NULL) return NULL;
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "state", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "state", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
SStateKey sKey = {.key = *key, .opNum = pState->number};
char buf[128] = {0};
@@ -1368,7 +1399,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentPrev_rocksdb(SStreamState* pSta
}
pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
char buf[128] = {0};
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
@@ -1407,7 +1439,8 @@ SStreamStateCur* streamStateSessionSeekKeyCurrentNext_rocksdb(SStreamState* pSta
return NULL;
}
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
char buf[128] = {0};
@@ -1443,7 +1476,8 @@ SStreamStateCur* streamStateSessionSeekKeyNext_rocksdb(SStreamState* pState, con
return NULL;
}
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
pCur->number = pState->number;
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
@@ -1535,7 +1569,8 @@ SStreamStateCur* streamStateFillGetCur_rocksdb(SStreamState* pState, const SWinK
if (pCur == NULL) return NULL;
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
@@ -1594,7 +1629,8 @@ SStreamStateCur* streamStateFillSeekKeyNext_rocksdb(SStreamState* pState, const
}
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
@@ -1629,7 +1665,8 @@ SStreamStateCur* streamStateFillSeekKeyPrev_rocksdb(SStreamState* pState, const
}
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "fill", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "fill", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
char buf[128] = {0};
int len = winKeyEncode((void*)key, buf);
@@ -1664,7 +1701,8 @@ int32_t streamStateSessionGetKeyByRange_rocksdb(SStreamState* pState, const SSes
}
pCur->number = pState->number;
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "sess", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "sess", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
int32_t c = 0;
@@ -1895,7 +1933,7 @@ int32_t streamDefaultIterGet_rocksdb(SStreamState* pState, const void* start, co
rocksdb_snapshot_t* snapshot = NULL;
rocksdb_readoptions_t* readopts = NULL;
- rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", (void**)&snapshot, (void**)&readopts);
+ rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts);
if (pIter == NULL) {
return -1;
}
@@ -1934,7 +1972,8 @@ void* streamDefaultIterCreate_rocksdb(SStreamState* pState) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
pCur->db = pState->pTdbState->rocksdb;
- pCur->iter = streamStateIterCreate(pState, "default", &pCur->snapshot, &pCur->readOpt);
+ pCur->iter = streamStateIterCreate(pState, "default", (rocksdb_snapshot_t**)&pCur->snapshot,
+ (rocksdb_readoptions_t**)&pCur->readOpt);
return pCur;
}
int32_t streamDefaultIterValid_rocksdb(void* iter) {
@@ -1965,7 +2004,6 @@ char* streamDefaultIterVal_rocksdb(void* iter, int32_t* len) {
}
return dst;
}
-
// batch func
void* streamStateCreateBatch() {
rocksdb_writebatch_t* pBatch = rocksdb_writebatch_create();
@@ -1980,7 +2018,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
int32_t streamStatePutBatch(SStreamState* pState, const char* cfName, rocksdb_writebatch_t* pBatch, void* key,
void* val, int32_t vlen, int64_t ttl) {
- int i = streamGetInit(cfName);
+ int i = streamGetInit(pState, cfName);
if (i < 0) {
qError("streamState failed to put to cf name:%s", cfName);
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index 8a038969785b9a4f2a64436cca1953d95805d128..a0caffd41fc33c16de5e3b6cbe078d8b14cad94e 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -17,14 +17,25 @@
#include "tstream.h"
#include "wal.h"
-SStreamTask* tNewStreamTask(int64_t streamId) {
+static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) {
+ int32_t childId = taosArrayGetSize(pArray);
+ pTask->selfChildId = childId;
+ taosArrayPush(pArray, &pTask);
+ return 0;
+}
+
+SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId;
+ pTask->taskLevel = taskLevel;
+ pTask->fillHistory = fillHistory;
+ pTask->triggerParam = triggerParam;
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
@@ -34,6 +45,7 @@ SStreamTask* tNewStreamTask(int64_t streamId) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
+ mndAddToTaskset(pTaskList, pTask);
return pTask;
}