提交 9222d965 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

......@@ -123,8 +123,8 @@ ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -O3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -g3 -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-reserved-user-defined-literal -g3 -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -Wformat=2 -Wno-format-nonliteral -Wno-format-truncation -Wno-format-y2k")
ENDIF ()
# disable all assert
......
......@@ -80,7 +80,7 @@ ENDIF ()
option(
BUILD_GEOS
"If build geos on Windows"
ON
OFF
)
option(
......
......@@ -231,6 +231,7 @@ if(${BUILD_WITH_ROCKSDB})
if(${TD_LINUX})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result")
endif(${TD_LINUX})
MESSAGE(STATUS "CXXXX STATUS CONFIG: " ${CMAKE_CXX_FLAGS})
if(${TD_DARWIN})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized")
......@@ -252,7 +253,7 @@ if(${BUILD_WITH_ROCKSDB})
endif(${TD_DARWIN})
if(${TD_WINDOWS})
option(WITH_JNI "" ON)
option(WITH_JNI "" OFF)
endif(${TD_WINDOWS})
if(${TD_WINDOWS})
......@@ -264,7 +265,7 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_FALLOCATE "" OFF)
option(WITH_JEMALLOC "" OFF)
option(WITH_GFLAGS "" OFF)
option(PORTABLE "" ON)
option(PORTABLE "" OFF)
option(WITH_LIBURING "" OFF)
option(FAIL_ON_WARNINGS OFF)
......@@ -272,8 +273,11 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_BENCHMARK_TOOLS "" OFF)
option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF)
IF (TD_LINUX)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" ON)
ELSE()
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
ENDIF()
add_subdirectory(rocksdb EXCLUDE_FROM_ALL)
target_include_directories(
rocksdb
......
......@@ -362,7 +362,7 @@ By using the optional req_id parameter, you can specify a request ID that can be
##### TaosConnection class
The `TaosConnection` class contains both an implementation of the PEP249 Connection interface (e.g., the `cursor()` method and the `close()` method) and many extensions (e.g., the `execute()`, `query()`, `schemaless_insert()`, and `subscribe()` methods).
As the way to connect introduced above but add `req_id` argument.
```python title="execute method"
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:insert}}
......@@ -372,13 +372,9 @@ The `TaosConnection` class contains both an implementation of the PEP249 Connect
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:query}}
```
:::tip
The queried results can only be fetched once. For example, only one of `fetch_all()` and `fetch_all_into_dict()` can be used in the example above. Repeated fetches will result in an empty list.
:::
##### Use of TaosResult class
In the above example of using the `TaosConnection` class, we have shown two ways to get the result of a query: `fetch_all()` and `fetch_all_into_dict()`. In addition, `TaosResult` also provides methods to iterate through the result set by rows (`rows_iter`) or by data blocks (`blocks_iter`). Using these two methods will be more efficient in scenarios where the query has a large amount of data.
As the way to fetch data introduced above but add `req_id` argument.
```python title="blocks_iter method"
{{#include docs/examples/python/result_set_with_req_id_examples.py}}
......@@ -391,17 +387,12 @@ The `TaosConnection` class and the `TaosResult` class already implement all the
{{#include docs/examples/python/cursor_usage_native_reference_with_req_id.py}}
```
:::note
The TaosCursor class uses native connections for write and query operations. In a client-side multi-threaded scenario, this cursor instance must remain thread exclusive and cannot be shared across threads for use, otherwise, it will result in errors in the returned results.
:::
</TabItem>
<TabItem value="rest" label="REST connection">
##### Use of TaosRestCursor class
The `TaosRestCursor` class is an implementation of the PEP249 Cursor interface.
As the way to connect introduced above but add `req_id` argument.
```python title="Use of TaosRestCursor"
{{#include docs/examples/python/connect_rest_with_req_id_examples.py:basic}}
......@@ -421,8 +412,11 @@ The `RestClient` class is a direct wrapper for the [REST API](/reference/rest-ap
For a more detailed description of the `sql()` method, please refer to [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html).
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
As the way to connect introduced above but add `req_id` argument.
```python
{{#include docs/examples/python/connect_websocket_with_req_id_examples.py:basic}}
```
......
......@@ -362,7 +362,7 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
##### TaosConnection 类的使用
`TaosConnection` 类既包含对 PEP249 Connection 接口的实现(如:`cursor`方法和 `close` 方法),也包含很多扩展功能(如: `execute`、 `query`、`schemaless_insert` 和 `subscribe` 方法
类似上文介绍的使用方法,增加 `req_id` 参数
```python title="execute 方法"
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:insert}}
......@@ -372,13 +372,9 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
{{#include docs/examples/python/connection_usage_native_reference_with_req_id.py:query}}
```
:::tip
查询结果只能获取一次。比如上面的示例中 `fetch_all()` 和 `fetch_all_into_dict()` 只能用一个。重复获取得到的结果为空列表。
:::
##### TaosResult 类的使用
上面 `TaosConnection` 类的使用示例中,我们已经展示了两种获取查询结果的方法: `fetch_all()` 和 `fetch_all_into_dict()`。除此之外 `TaosResult` 还提供了按行迭代(`rows_iter`)或按数据块迭代(`blocks_iter`)结果集的方法。在查询数据量较大的场景,使用这两个方法会更高效
类似上文介绍的使用方法,增加 `req_id` 参数
```python title="blocks_iter 方法"
{{#include docs/examples/python/result_set_with_req_id_examples.py}}
......@@ -391,14 +387,11 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
{{#include docs/examples/python/cursor_usage_native_reference_with_req_id.py}}
```
:::note
TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线程的场景下,这个游标实例必须保持线程独享,不能跨线程共享使用,否则会导致返回结果出现错误。
:::
</TabItem>
<TabItem value="rest" label="REST 连接">
类似上文介绍的使用方法,增加 `req_id` 参数。
##### TaosRestCursor 类的使用
`TaosRestCursor` 类是对 PEP249 Cursor 接口的实现。
......@@ -420,8 +413,11 @@ TaosCursor 类使用原生连接进行写入、查询操作。在客户端多线
对于 `sql()` 方法更详细的介绍, 请参考 [RestClient](https://docs.taosdata.com/api/taospy/taosrest/restclient.html)。
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
类似上文介绍的使用方法,增加 `req_id` 参数。
```python
{{#include docs/examples/python/connect_websocket_with_req_id_examples.py:basic}}
```
......
......@@ -247,10 +247,17 @@ launchctl limit maxfiles
该提示是创建 db 的 vnode 数量不够了,需要的 vnode 不能超过了 dnode 中 vnode 的上限。因为系统默认是一个 dnode 中有 CPU 核数两倍的 vnode,也可以通过配置文件中的参数 supportVnodes 控制。
正常调大 taos.cfg 中 supportVnodes 参数即可。
### 21 【查询】在服务器上的使用 tao-CLI 能查到指定时间段的数据,但在客户端机器上查不到?
### 21 在服务器上的使用 taos-CLI 能查到指定时间段的数据,但在客户端机器上查不到?
这种情况是因为客户端与服务器上设置的时区不一致导致的,调整客户端与服务器的时区一致即可解决。
### 22 【表名】表名确认是存在的,但写入或查询时报表不存在错误,非常奇怪,什么原因?
### 22 表名确认是存在的,但在写入或查询时返回表名不存在,什么原因?
TDengine 中的所有名称,包括数据库名、表名等都是区分大小写的,如果这些名称在程序或 taos-CLI 中没有使用反引号(`)括起来使用,即使你输入的是大写的,引擎也会转化成小写来使用,如果名称前后加上了反引号,引擎就不会再转化成小写,会保持原样来使用。
### 23 在 taos-CLI 中查询,字段内容不能完全显示出来怎么办?
可以使用 \G 参数来竖式显示,如 show databases\G; (为了输入方便,在"\"后加 TAB 键,会自动补全后面的内容)
### 24 使用 taosBenchmark 测试工具写入数据查询很快,为什么我写入的数据查询非常慢?
TDengine 在写入数据时如果有很严重的乱序写入问题,会严重影响查询性能,所以需要在写入前解决乱序的问题。如果业务是从 kafka 消费写入,请合理设计消费者,尽可能的一个子表数据由一个消费者去消费并写入,避免由设计产生的乱序。
### 25 我想统计下前后两条写入记录之间的时间差值是多少?
使用 DIFF 函数,可以查看时间列或数值列前后两条记录的差值,非常方便,详细说明见 SQL手册->函数->DIFF
......@@ -21,8 +21,8 @@ extern "C" {
#endif
#include "tcommon.h"
#include "tvariant.h"
#include "tsimplehash.h"
#include "tvariant.h"
struct SqlFunctionCtx;
struct SResultRowEntryInfo;
......@@ -77,7 +77,7 @@ enum {
enum {
MAIN_SCAN = 0x0u,
REVERSE_SCAN = 0x1u, // todo remove it
PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
PRE_SCAN = 0x2u, // pre-scan belongs to the main scan and occurs before main scan
};
typedef struct SPoint1 {
......@@ -130,43 +130,44 @@ typedef struct SSerializeDataHandle {
// incremental state storage
typedef struct STdbState {
void* rocksdb;
void** pHandle;
void* writeOpts;
void* readOpts;
void** cfOpts;
void* dbOpt;
struct SStreamTask* pOwner;
void* param;
void* env;
SListNode* pComparNode;
void* pBackendHandle;
void *rocksdb;
void **pHandle;
void *writeOpts;
void *readOpts;
void **cfOpts;
void *dbOpt;
struct SStreamTask *pOwner;
void *param;
void *env;
SListNode *pComparNode;
void *pBackend;
char idstr[64];
void* compactFactory;
void* db;
void* pStateDb;
void* pFuncStateDb;
void* pFillStateDb; // todo refactor
void* pSessionStateDb;
void* pParNameDb;
void* pParTagDb;
void* txn;
void *compactFactory;
TdThreadRwlock rwLock;
void *db;
void *pStateDb;
void *pFuncStateDb;
void *pFillStateDb; // todo refactor
void *pSessionStateDb;
void *pParNameDb;
void *pParTagDb;
void *txn;
} STdbState;
typedef struct {
STdbState* pTdbState;
struct SStreamFileState* pFileState;
int32_t number;
SSHashObj* parNameMap;
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
STdbState *pTdbState;
struct SStreamFileState *pFileState;
int32_t number;
SSHashObj *parNameMap;
int64_t checkPointId;
int32_t taskId;
int64_t streamId;
} SStreamState;
typedef struct SFunctionStateStore {
int32_t (*streamStateFuncPut)(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
int32_t (*streamStateFuncGet)(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen);
int32_t (*streamStateFuncPut)(SStreamState *pState, const SWinKey *key, const void *value, int32_t vLen);
int32_t (*streamStateFuncGet)(SStreamState *pState, const SWinKey *key, void **ppVal, int32_t *pVLen);
} SFunctionStateStore;
// sql function runtime context
......@@ -180,7 +181,7 @@ typedef struct SqlFunctionCtx {
int16_t functionId; // function id
char *pOutput; // final result output buffer, point to sdata->data
// input parameter, e.g., top(k, 20), the number of results of top query is kept in param
SFunctParam *param;
SFunctParam *param;
// corresponding output buffer for timestamp of each result, e.g., diff/csum
SColumnInfoData *pTsOutput;
int32_t numOfParams;
......
......@@ -34,7 +34,24 @@ extern "C" {
// SListNode* streamBackendAddCompare(void* backend, void* arg);
// void streamBackendDelCompare(void* backend, void* arg);
//typedef struct STdbState {
// <<<<<<< HEAD
// typedef struct STdbState {
// rocksdb_t* rocksdb;
// rocksdb_column_family_handle_t** pHandle;
// rocksdb_writeoptions_t* writeOpts;
// rocksdb_readoptions_t* readOpts;
// rocksdb_options_t** cfOpts;
// rocksdb_options_t* dbOpt;
// struct SStreamTask* pOwner;
// void* param;
// void* env;
// SListNode* pComparNode;
// void* pBackend;
// char idstr[64];
// void* compactFactory;
// TdThreadRwlock rwLock;
// =======
// typedef struct STdbState {
// rocksdb_t* rocksdb;
// rocksdb_column_family_handle_t** pHandle;
// rocksdb_writeoptions_t* writeOpts;
......@@ -58,6 +75,7 @@ extern "C" {
// TTB* pParTagDb;
// TXN* txn;
//} STdbState;
//>>>>>>> enh/dev3.0
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState, bool remove);
......
......@@ -78,11 +78,11 @@ enum {
TASK_TRIGGER_STATUS__ACTIVE,
};
enum {
typedef enum {
TASK_LEVEL__SOURCE = 1,
TASK_LEVEL__AGG,
TASK_LEVEL__SINK,
};
} ETASK_LEVEL;
enum {
TASK_OUTPUT__FIXED_DISPATCH = 1,
......@@ -284,13 +284,13 @@ struct SStreamTask {
int16_t dispatchMsgType;
SStreamStatus status;
int32_t selfChildId;
int32_t nodeId;
int32_t nodeId; // vgroup id
SEpSet epSet;
SCheckpointInfo chkInfo;
STaskExec exec;
// fill history
int8_t fillHistory;
int8_t fillHistory; // fill history
int64_t ekey; // end ts key
int64_t endVer; // end version
// children info
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
......@@ -351,7 +351,7 @@ typedef struct SStreamMeta {
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewStreamTask(int64_t streamId);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask);
......
......@@ -158,6 +158,13 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
*/
JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableName(JNIEnv *, jobject, jlong);
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: tmqGetOffset
* Signature: (J)Ljava/lang/String;
*/
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *, jobject, jlong);
/*
* Class: com_taosdata_jdbc_tmq_TMQConnector
* Method: fetchBlockImp
......@@ -166,6 +173,12 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *, jobject, jlong, jlong,
jobject, jobject);
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *, jobject, jlong, jstring, jint,
jlong);
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *, jobject, jlong,
jstring, jobject);
#ifdef __cplusplus
}
#endif
......
......@@ -17,9 +17,16 @@
#include "jniCommon.h"
#include "taos.h"
int __init_tmq = 0;
int __init_tmq = 0;
jmethodID g_offsetCallback;
jclass g_assignmentClass;
jmethodID g_assignmentConstructor;
jmethodID g_assignmentSetVgId;
jmethodID g_assignmentSetCurrentOffset;
jmethodID g_assignmentSetBegin;
jmethodID g_assignmentSetEnd;
void tmqGlobalMethod(JNIEnv *env) {
// make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init_tmq, 0, 1)) {
......@@ -46,6 +53,38 @@ void tmqGlobalMethod(JNIEnv *env) {
jniDebug("tmq method register finished");
}
int __init_assignment = 0;
void tmqAssignmentMethod(JNIEnv *env) {
// make sure init function executed once
switch (atomic_val_compare_exchange_32(&__init_assignment, 0, 1)) {
case 0:
break;
case 1:
do {
taosMsleep(0);
} while (atomic_load_32(&__init_assignment) == 1);
case 2:
return;
}
if (g_vm == NULL) {
(*env)->GetJavaVM(env, &g_vm);
}
jclass assignment = (*env)->FindClass(env, "com/taosdata/jdbc/tmq/Assignment");
g_assignmentClass = (*env)->NewGlobalRef(env, assignment);
g_assignmentConstructor = (*env)->GetMethodID(env, g_assignmentClass, "<init>", "()V");
g_assignmentSetVgId = (*env)->GetMethodID(env, g_assignmentClass, "setVgId", "(I)V"); // int
g_assignmentSetCurrentOffset = (*env)->GetMethodID(env, g_assignmentClass, "setCurrentOffset", "(J)V"); // long
g_assignmentSetBegin = (*env)->GetMethodID(env, g_assignmentClass, "setBegin", "(J)V"); // long
g_assignmentSetEnd = (*env)->GetMethodID(env, g_assignmentClass, "setEnd", "(J)V"); // long
(*env)->DeleteLocalRef(env, assignment);
atomic_store_32(&__init_assignment, 2);
jniDebug("tmq method assignment finished");
}
// deprecated
void commit_cb(tmq_t *tmq, int32_t code, void *param) {
JNIEnv *env = NULL;
......@@ -266,8 +305,9 @@ JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqCommitAsync(JN
tmq_commit_async(tmq, res, commit_cb, consumer);
}
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj, jlong jtmq,
jlong jres, jobject offset) {
JNIEXPORT void JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_consumerCommitAsync(JNIEnv *env, jobject jobj,
jlong jtmq, jlong jres,
jobject offset) {
tmqGlobalMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
......@@ -335,7 +375,7 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetVgroupId(JN
TAOS_RES *res = (TAOS_RES *)jres;
if (res == NULL) {
jniDebug("jobj:%p, invalid res handle", jobj);
return -1;
return JNI_RESULT_SET_NULL;
}
return tmq_get_vgroup_id(res);
}
......@@ -350,6 +390,15 @@ JNIEXPORT jstring JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTableNam
return (*env)->NewStringUTF(env, tmq_get_table_name(res));
}
JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetOffset(JNIEnv *env, jobject jobj, jlong jres) {
TAOS_RES *res = (TAOS_RES *)jres;
if (res == NULL) {
jniDebug("jobj:%p, invalid res handle", jobj);
return JNI_RESULT_SET_NULL;
}
return tmq_get_vgroup_offset(res);
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(JNIEnv *env, jobject jobj, jlong con,
jlong res, jobject rowobj,
jobject arrayListObj) {
......@@ -369,7 +418,8 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
jniDebug("jobj:%p, conn:%p, resultset:%p, no data to retrieve", jobj, tscon, (void *)res);
return JNI_FETCH_END;
} else {
jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code, taos_errstr(tres));
jniError("jobj:%p, conn:%p, query interrupted, tmq fetch block error code:%d, msg:%s", jobj, tscon, error_code,
taos_errstr(tres));
return JNI_RESULT_SET_NULL;
}
}
......@@ -399,3 +449,72 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_fetchRawBlockImp(
(*env)->CallVoidMethod(env, rowobj, g_blockdataSetByteArrayFp, jniFromNCharToByteArray(env, (char *)data, len));
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqSeekImp(JNIEnv *env, jobject jobj, jlong jtmq,
jstring jtopic, jint partition,
jlong offset) {
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
int32_t res = tmq_offset_seek(tmq, topicName, partition, offset);
if (res != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, tmq seek error, code:%d, msg:%s", jobj, res, tmq_err2str(res));
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
return (jint)res;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_tmq_TMQConnector_tmqGetTopicAssignmentImp(JNIEnv *env, jobject jobj,
jlong jtmq, jstring jtopic,
jobject jarrayList) {
tmqAssignmentMethod(env);
tmq_t *tmq = (tmq_t *)jtmq;
if (tmq == NULL) {
jniDebug("jobj:%p, tmq is closed", jobj);
return TMQ_CONSUMER_NULL;
}
if (jtopic == NULL) {
jniDebug("jobj:%p, topic is null", jobj);
return TMQ_TOPIC_NULL;
}
const char *topicName = (*env)->GetStringUTFChars(env, jtopic, NULL);
tmq_topic_assignment *pAssign = NULL;
int32_t numOfAssignment = 0;
int32_t res = tmq_get_topic_assignment(tmq, topicName, &pAssign, &numOfAssignment);
if (res != TSDB_CODE_SUCCESS) {
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
jniError("jobj:%p, tmq get topic assignment error, topic:%s, code:%d, msg:%s", jobj, topicName, res,
tmq_err2str(res));
tmq_free_assignment(pAssign);
return (jint)res;
}
(*env)->ReleaseStringUTFChars(env, jtopic, topicName);
for (int i = 0; i < numOfAssignment; ++i) {
tmq_topic_assignment assignment = pAssign[i];
jobject jassignment = (*env)->NewObject(env, g_assignmentClass, g_assignmentConstructor);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetVgId, assignment.vgId);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetCurrentOffset, assignment.currentOffset);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetBegin, assignment.begin);
(*env)->CallVoidMethod(env, jassignment, g_assignmentSetEnd, assignment.end);
(*env)->CallBooleanMethod(env, jarrayList, g_arrayListAddFp, jassignment);
}
tmq_free_assignment(pAssign);
return JNI_SUCCESS;
}
......@@ -663,9 +663,10 @@ typedef struct {
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
int32_t fixedSinkVgId; // 0 for shuffle
// fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg;
int32_t fixedSinkVgId; // 0 for shuffle
// transformation
char* sql;
......
......@@ -14,18 +14,8 @@
*/
#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndSnode.h"
#include "mndStb.h"
#include "mndStream.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "parser.h"
#include "tcompare.h"
......@@ -34,12 +24,8 @@
extern bool tsDeployOnSnode;
static int32_t mndAddTaskToTaskSet(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->selfChildId = childId;
taosArrayPush(pArray, &pTask);
return 0;
}
static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup);
static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask);
int32_t mndConvertRsmaTask(char** pDst, int32_t* pDstLen, const char* ast, int64_t uid, int8_t triggerType,
int64_t watermark, int64_t deleteMark) {
......@@ -97,7 +83,7 @@ END:
return terrno;
}
int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) {
if (pStream->smaId != 0) {
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
......@@ -106,16 +92,23 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
if (pTask->tbSink.pSchemaWrapper == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
}
return 0;
}
int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
#define SINK_NODE_LEVEL (0)
int32_t mndAddDispatcherForInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) {
bool isShuffle = false;
if (pStream->fixedSinkVgId == 0) {
SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb);
if (pDb != NULL && pDb->cfg.numOfVgroups > 1) {
isShuffle = true;
pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
......@@ -127,47 +120,46 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream
sdbRelease(pMnode->pSdb, pDb);
}
SArray* pSinkNodeList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList);
if (isShuffle) {
memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t sz = taosArrayGetSize(pVgs);
SArray* sinkLv = taosArrayGetP(pStream->tasks, 0);
int32_t sinkLvSize = taosArrayGetSize(sinkLv);
for (int32_t i = 0; i < sz; i++) {
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
for (int32_t j = 0; j < sinkLvSize; j++) {
SStreamTask* pLastLevelTask = taosArrayGetP(sinkLv, j);
if (pLastLevelTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pLastLevelTask->id.taskId;
for (int32_t j = 0; j < numOfSinkNodes; j++) {
SStreamTask* pSinkTask = taosArrayGetP(pSinkNodeList, j);
if (pSinkTask->nodeId == pVgInfo->vgId) {
pVgInfo->taskId = pSinkTask->id.taskId;
break;
}
}
}
} else {
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
SArray* pArray = taosArrayGetP(pStream->tasks, 0);
// one sink only
SStreamTask* lastLevelTask = taosArrayGetP(pArray, 0);
pTask->fixedEpDispatcher.taskId = lastLevelTask->id.taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
SStreamTask* pOneSinkTask = taosArrayGetP(pSinkNodeList, 0);
setFixedDownstreamEpInfo(pTask, pOneSinkTask);
}
return 0;
}
int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t mndAssignStreamTaskToVgroup(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, const SVgObj* pVgroup) {
int32_t msgLen;
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.nodeId = pTask->nodeId;
plan->execNode.epSet = pTask->epSet;
if (qSubPlanToString(plan, &pTask->exec.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
return 0;
}
......@@ -210,100 +202,121 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup;
}
// create sink node for each vgroup.
int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb;
void* pIter = NULL;
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
while (1) {
SVgObj* pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
if (pIter == NULL) {
break;
}
SStreamTask* pTask = tNewStreamTask(pStream->uid);
if (pTask == NULL) {
if (!mndVgroupInDb(pVgroup, pStream->targetDbUid)) {
sdbRelease(pSdb, pVgroup);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
continue;
}
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(tasks, pTask);
pTask->nodeId = pVgroup->vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
// type
pTask->taskLevel = TASK_LEVEL__SINK;
// sink
if (pStream->smaId != 0) {
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
} else {
pTask->outputType = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
if (pTask->tbSink.pSchemaWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
mndAddSinkTaskToStream(pStream, pMnode, pVgroup->vgId, pVgroup);
sdbRelease(pSdb, pVgroup);
}
return 0;
}
int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) {
SArray* tasks = taosArrayGetP(pStream->tasks, 0);
SStreamTask* pTask = tNewStreamTask(pStream->uid);
int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup) {
SArray* pTaskList = taosArrayGetP(pStream->tasks, SINK_NODE_LEVEL);
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SINK, pStream->fillHistory, 0, pTaskList);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(tasks, pTask);
pTask->nodeId = pStream->fixedSinkVgId;
#if 0
SVgObj* pVgroup = mndAcquireVgroup(pMnode, pStream->fixedSinkVgId);
if (pVgroup == NULL) {
return -1;
}
pTask->nodeId = vgId;
pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup);
#endif
pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg);
mndSetSinkTaskInfo(pStream, pTask);
return 0;
}
pTask->taskLevel = TASK_LEVEL__SINK;
static int32_t mndScheduleFillHistoryStreamTask(SMnode* pMnode, SStreamObj* pStream) {
return 0;
}
// sink
if (pStream->smaId != 0) {
pTask->outputType = TASK_OUTPUT__SMA;
pTask->smaSink.smaId = pStream->smaId;
static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTaskList, SStreamObj* pStream,
SSubplan* plan, uint64_t uid, int8_t taskLevel, int8_t fillHistory,
bool hasExtraSink) {
SStreamTask* pTask = tNewStreamTask(uid, taskLevel, fillHistory, pStream->triggerParam, pTaskList);
if (pTask == NULL) {
return terrno;
}
// sink or dispatch
if (hasExtraSink) {
mndAddDispatcherForInnerTask(pMnode, pStream, pTask);
} else {
pTask->outputType = TASK_OUTPUT__TABLE;
pTask->tbSink.stbUid = pStream->targetStbUid;
memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
mndSetSinkTaskInfo(pStream, pTask);
}
return 0;
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
return terrno;
}
return TSDB_CODE_SUCCESS;
}
static SStreamChildEpInfo* createStreamTaskEpInfo(SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pEpInfo->childId = pTask->selfChildId;
pEpInfo->epSet = pTask->epSet;
pEpInfo->nodeId = pTask->nodeId;
pEpInfo->taskId = pTask->id.taskId;
return pEpInfo;
}
void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) {
STaskDispatcherFixedEp* pDispatcher = &pDstTask->fixedEpDispatcher;
pDispatcher->taskId = pTask->id.taskId;
pDispatcher->nodeId = pTask->nodeId;
pDispatcher->epSet = pTask->epSet;
pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
pDstTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
}
int32_t appendToUpstream(SStreamTask* pTask, SStreamTask* pUpstream) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pTask);
if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if(pUpstream->childEpInfo == NULL) {
pUpstream->childEpInfo = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pUpstream->childEpInfo, &pEpInfo);
return TSDB_CODE_SUCCESS;
}
int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SSdb* pSdb = pMnode->pSdb;
SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*));
int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
pStream->tasks = taosArrayInit(planTotLevel, POINTER_BYTES);
bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
......@@ -313,13 +326,13 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
bool multiTarget = (pDbObj->cfg.numOfVgroups > 1);
sdbRelease(pSdb, pDbObj);
if (planTotLevel == 2 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) {
/*if (true) {*/
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
SArray* taskOneLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskOneLevel);
// add extra sink
hasExtraSink = true;
if (pStream->fixedSinkVgId == 0) {
......@@ -328,19 +341,20 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
} else {
if (mndAddFixedSinkTaskToStream(pMnode, pStream) < 0) {
if (mndAddSinkTaskToStream(pStream, pMnode, pStream->fixedSinkVgId, &pStream->fixedSinkVg) < 0) {
// TODO free
return -1;
}
}
}
pStream->totalLevel = planTotLevel + hasExtraSink;
if (planTotLevel > 1) {
SStreamTask* pInnerTask;
// inner level
{
SArray* taskInnerLevel = taosArrayInit(0, sizeof(void*));
SArray* taskInnerLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskInnerLevel);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
......@@ -350,25 +364,15 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1;
}
pInnerTask = tNewStreamTask(pStream->uid);
pInnerTask = tNewStreamTask(pStream->uid, TASK_LEVEL__AGG, pStream->fillHistory, pStream->triggerParam, taskInnerLevel);
if (pInnerTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
qDestroyQueryPlan(pPlan);
return -1;
}
pInnerTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskInnerLevel, pInnerTask);
pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*));
pInnerTask->taskLevel = TASK_LEVEL__AGG;
// trigger
pInnerTask->triggerParam = pStream->triggerParam;
// dispatch
if (mndAddDispatcherToInnerTask(pMnode, pStream, pInnerTask) < 0) {
if (mndAddDispatcherForInnerTask(pMnode, pStream, pInnerTask) < 0) {
qDestroyQueryPlan(pPlan);
return -1;
}
......@@ -377,7 +381,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
if (pSnode == NULL) {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) {
if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
......@@ -392,17 +396,18 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
}
} else {
SVgObj* pVgroup = mndSchedFetchOneVg(pMnode, pStream->sourceDbUid);
if (mndAssignTaskToVg(pMnode, pInnerTask, plan, pVgroup) < 0) {
if (mndAssignStreamTaskToVgroup(pMnode, pInnerTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pVgroup);
}
}
// source level
SArray* taskSourceLevel = taosArrayInit(0, sizeof(void*));
SArray* taskSourceLevel = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &taskSourceLevel);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 1);
......@@ -416,66 +421,52 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pIter == NULL) {
break;
}
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
}
SStreamTask* pTask = tNewStreamTask(pStream->uid);
SStreamTask* pTask = tNewStreamTask(pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, 0, taskSourceLevel);
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskSourceLevel, pTask);
pTask->triggerParam = 0;
// source
pTask->taskLevel = TASK_LEVEL__SOURCE;
// add fixed vg dispatch
pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH;
pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH;
// all the source tasks dispatch result to a single agg node.
setFixedDownstreamEpInfo(pTask, pInnerTask);
pTask->fixedEpDispatcher.taskId = pInnerTask->id.taskId;
pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId;
pTask->fixedEpDispatcher.epSet = pInnerTask->epSet;
if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
if (mndAssignStreamTaskToVgroup(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
}
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
sdbRelease(pSdb, pVgroup);
int32_t code = appendToUpstream(pTask, pInnerTask);
sdbRelease(pSdb, pVgroup);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
qDestroyQueryPlan(pPlan);
return -1;
}
pEpInfo->childId = pTask->selfChildId;
pEpInfo->epSet = pTask->epSet;
pEpInfo->nodeId = pTask->nodeId;
pEpInfo->taskId = pTask->id.taskId;
taosArrayPush(pInnerTask->childEpInfo, &pEpInfo);
sdbRelease(pSdb, pVgroup);
}
}
if (planTotLevel == 1) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel);
} else if (planTotLevel == 1) {
// create exec stream task, since only one level, the exec task is also the source task
SArray* pTaskList = taosArrayInit(0, POINTER_BYTES);
taosArrayPush(pStream->tasks, &pTaskList);
SNodeListNode* inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
if (LIST_LENGTH(inner->pNodeList) != 1) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
SSubplan* plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
if (plan->subplanType != SUBPLAN_TYPE_SCAN) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
......@@ -486,42 +477,26 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
while (1) {
SVgObj* pVgroup;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup);
continue;
if (pIter == NULL) {
break;
}
SStreamTask* pTask = tNewStreamTask(pStream->uid);
if (pTask == NULL) {
if (!mndVgroupInDb(pVgroup, pStream->sourceDbUid)) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan);
return -1;
continue;
}
pTask->fillHistory = pStream->fillHistory;
mndAddTaskToTaskSet(taskOneLevel, pTask);
// source
pTask->taskLevel = TASK_LEVEL__SOURCE;
// trigger
pTask->triggerParam = pStream->triggerParam;
// sink or dispatch
if (hasExtraSink) {
mndAddDispatcherToInnerTask(pMnode, pStream, pTask);
} else {
mndAddSinkToTask(pMnode, pStream, pTask);
}
// new stream task
int32_t code = addSourceStreamTask(pMnode, pVgroup, pTaskList, pStream, plan, pStream->uid, TASK_LEVEL__SOURCE, pStream->fillHistory, hasExtraSink);
sdbRelease(pSdb, pVgroup);
if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) {
sdbRelease(pSdb, pVgroup);
if (code != TSDB_CODE_SUCCESS) {
qDestroyQueryPlan(pPlan);
return -1;
}
sdbRelease(pSdb, pVgroup);
}
}
qDestroyQueryPlan(pPlan);
return 0;
}
......
......@@ -700,6 +700,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
if (pStream->sourceDbUid == streamObj.sourceDbUid) {
++numOfStream;
}
sdbRelease(pMnode->pSdb, pStream);
if (numOfStream > MND_STREAM_MAX_NUM) {
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
......@@ -723,6 +724,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
pDb = NULL;
goto _OVER;
}
mndReleaseDb(pMnode, pDb);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
......
......@@ -99,6 +99,7 @@ target_include_directories(
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
)
IF (TD_LINUX)
target_link_libraries(
vnode
PUBLIC os
......@@ -119,6 +120,28 @@ target_link_libraries(
PUBLIC stream
PUBLIC index
)
ELSE()
target_link_libraries(
vnode
PUBLIC os
PUBLIC util
PUBLIC common
PUBLIC tfs
PUBLIC wal
PUBLIC qworker
PUBLIC sync
PUBLIC executor
PUBLIC scheduler
PUBLIC tdb
# PUBLIC bdb
# PUBLIC scalar
PUBLIC rocksdb
PUBLIC transport
PUBLIC stream
PUBLIC index
)
ENDIF()
IF (TD_GRANT)
TARGET_LINK_LIBRARIES(vnode PUBLIC grant)
......
......@@ -1231,7 +1231,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
streamProcessRunReq(pTask);
} else {
if (streamTaskShouldPause(&pTask->status)) {
atomic_val_compare_exchange_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__WAITING, TASK_SCHED_STATUS__INACTIVE);
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
}
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state", vgId, pTask->id.idStr);
}
......
......@@ -128,6 +128,7 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
rocksdb_options_set_comparator(options, cmp);
rocksdb_block_based_options_set_block_cache(tableoptions, cache);
rocksdb_options_set_block_based_table_factory(options, tableoptions);
rocksdb_options_set_info_log_level(options, 2); // WARN_LEVEL
// rocksdb_options_set_inplace_update_support(options, 1);
// rocksdb_options_set_allow_concurrent_memtable_write(options, 0);
......
......@@ -19,7 +19,7 @@
#include "trpc.h"
extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {.statEnable = true};
SCtgDebug gCTGDebug = {0};
#if 0
......
......@@ -6,12 +6,22 @@ target_include_directories(
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
if(${BUILD_WITH_ROCKSDB})
IF (TD_LINUX)
target_link_libraries(
stream
PUBLIC rocksdb-shared tdb
PRIVATE os util transport qcom executor wal index
)
ELSE()
target_link_libraries(
stream
PUBLIC rocksdb tdb
PRIVATE os util transport qcom executor wal index
)
ENDIF()
target_include_directories(
stream
......
......@@ -17,14 +17,25 @@
#include "tstream.h"
#include "wal.h"
SStreamTask* tNewStreamTask(int64_t streamId) {
static int32_t mndAddToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);
pTask->selfChildId = childId;
taosArrayPush(pArray, &pTask);
return 0;
}
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList) {
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId;
pTask->taskLevel = taskLevel;
pTask->fillHistory = fillHistory;
pTask->triggerParam = triggerParam;
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
......@@ -34,6 +45,7 @@ SStreamTask* tNewStreamTask(int64_t streamId) {
pTask->inputStatus = TASK_INPUT_STATUS__NORMAL;
pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL;
mndAddToTaskset(pTaskList, pTask);
return pTask;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册