Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
648e0ac1
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
648e0ac1
编写于
6月 01, 2022
作者:
C
Cary Xu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/TD-14481-3.0
上级
15b25eee
a5c0802b
变更
63
展开全部
隐藏空白更改
内联
并排
Showing
63 changed file
with
2116 addition
and
3251 deletion
+2116
-3251
docs-cn/12-taos-sql/07-function.md
docs-cn/12-taos-sql/07-function.md
+640
-1276
docs-cn/12-taos-sql/12-keywords.md
docs-cn/12-taos-sql/12-keywords.md
+3
-0
docs-en/12-taos-sql/07-function.md
docs-en/12-taos-sql/07-function.md
+592
-1271
docs-en/12-taos-sql/12-keywords.md
docs-en/12-taos-sql/12-keywords.md
+1
-0
example/src/tmq.c
example/src/tmq.c
+4
-41
include/client/taos.h
include/client/taos.h
+12
-1
include/common/tmsg.h
include/common/tmsg.h
+1
-1
include/libs/parser/parser.h
include/libs/parser/parser.h
+2
-2
include/os/osDir.h
include/os/osDir.h
+11
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-1
include/util/tdef.h
include/util/tdef.h
+1
-2
source/client/inc/clientStmt.h
source/client/inc/clientStmt.h
+3
-0
source/client/src/clientMain.c
source/client/src/clientMain.c
+41
-0
source/client/src/clientStmt.c
source/client/src/clientStmt.c
+114
-7
source/client/src/tmq.c
source/client/src/tmq.c
+14
-20
source/common/src/systable.c
source/common/src/systable.c
+0
-1
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+14
-65
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+3
-3
source/dnode/mnode/impl/src/mndAcct.c
source/dnode/mnode/impl/src/mndAcct.c
+1
-1
source/dnode/mnode/impl/src/mndBnode.c
source/dnode/mnode/impl/src/mndBnode.c
+2
-2
source/dnode/mnode/impl/src/mndCluster.c
source/dnode/mnode/impl/src/mndCluster.c
+2
-5
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+3
-3
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+3
-3
source/dnode/mnode/impl/src/mndDnode.c
source/dnode/mnode/impl/src/mndDnode.c
+5
-9
source/dnode/mnode/impl/src/mndFunc.c
source/dnode/mnode/impl/src/mndFunc.c
+2
-2
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+3
-3
source/dnode/mnode/impl/src/mndMnode.c
source/dnode/mnode/impl/src/mndMnode.c
+18
-11
source/dnode/mnode/impl/src/mndOffset.c
source/dnode/mnode/impl/src/mndOffset.c
+1
-1
source/dnode/mnode/impl/src/mndQnode.c
source/dnode/mnode/impl/src/mndQnode.c
+2
-2
source/dnode/mnode/impl/src/mndSma.c
source/dnode/mnode/impl/src/mndSma.c
+3
-3
source/dnode/mnode/impl/src/mndSnode.c
source/dnode/mnode/impl/src/mndSnode.c
+2
-2
source/dnode/mnode/impl/src/mndStb.c
source/dnode/mnode/impl/src/mndStb.c
+3
-3
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+1
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+6
-7
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+2
-2
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+93
-172
source/dnode/mnode/impl/src/mndUser.c
source/dnode/mnode/impl/src/mndUser.c
+4
-8
source/dnode/mnode/impl/test/trans/trans2.cpp
source/dnode/mnode/impl/test/trans/trans2.cpp
+9
-5
source/dnode/mnode/impl/test/user/CMakeLists.txt
source/dnode/mnode/impl/test/user/CMakeLists.txt
+6
-4
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+4
-3
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+13
-6
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+27
-182
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+134
-0
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+2
-0
source/dnode/vnode/test/tsdbSmaTest.cpp
source/dnode/vnode/test/tsdbSmaTest.cpp
+1
-1
source/libs/catalog/test/catalogTests.cpp
source/libs/catalog/test/catalogTests.cpp
+1
-1
source/libs/function/src/builtinsimpl.c
source/libs/function/src/builtinsimpl.c
+2
-2
source/libs/parser/src/parInsert.c
source/libs/parser/src/parInsert.c
+17
-12
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+2
-1
source/libs/qworker/test/qworkerTests.cpp
source/libs/qworker/test/qworkerTests.cpp
+1
-1
source/libs/scalar/test/filter/filterTests.cpp
source/libs/scalar/test/filter/filterTests.cpp
+1
-1
source/libs/scalar/test/scalar/CMakeLists.txt
source/libs/scalar/test/scalar/CMakeLists.txt
+6
-4
source/libs/scalar/test/scalar/scalarTests.cpp
source/libs/scalar/test/scalar/scalarTests.cpp
+1
-1
source/libs/scheduler/test/schedulerTests.cpp
source/libs/scheduler/test/schedulerTests.cpp
+1
-1
source/os/CMakeLists.txt
source/os/CMakeLists.txt
+5
-1
source/os/src/osEnv.c
source/os/src/osEnv.c
+2
-2
source/util/src/terror.c
source/util/src/terror.c
+3
-3
tests/script/api/batchprepare.c
tests/script/api/batchprepare.c
+251
-67
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+0
-1
tests/script/tsim/mnode/basic3.sim
tests/script/tsim/mnode/basic3.sim
+10
-1
tests/script/tsim/trans/create_db.sim
tests/script/tsim/trans/create_db.sim
+1
-17
tests/test/c/sdbDump.c
tests/test/c/sdbDump.c
+2
-2
tools/taos-tools
tools/taos-tools
+1
-1
未找到文件。
docs-cn/12-taos-sql/07-function.md
浏览文件 @
648e0ac1
此差异已折叠。
点击以展开。
docs-cn/12-taos-sql/12-keywords.md
浏览文件 @
648e0ac1
...
...
@@ -93,10 +93,13 @@ title: TDengine 参数限制与保留关键字
`TBNAME` 可以视为超级表中一个特殊的标签,代表子表的表名。
获取一个超级表所有的子表名及相关的标签信息:
```mysql
SELECT TBNAME, location FROM meters;
```
统计超级表下辖子表数量:
```mysql
SELECT COUNT(TBNAME) FROM meters;
```
...
...
docs-en/12-taos-sql/07-function.md
浏览文件 @
648e0ac1
此差异已折叠。
点击以展开。
docs-en/12-taos-sql/12-keywords.md
浏览文件 @
648e0ac1
...
...
@@ -56,6 +56,7 @@ There are about 200 keywords reserved by TDengine, they can't be used as the nam
Get the table name and tag values of all subtables in a STable.
```
mysql
SELECT TBNAME, location FROM meters;
```
Count the number of subtables in a STable.
```
mysql
...
...
example/src/tmq.c
浏览文件 @
648e0ac1
...
...
@@ -165,7 +165,6 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"false"
);
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
...
...
@@ -191,20 +190,18 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
return
;
}
int32_t
cnt
=
0
;
/*clock_t startTime = clock();*/
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
0
);
if
(
tmqmessage
)
{
cnt
++
;
msg_process
(
tmqmessage
);
if
(
cnt
>=
2
)
break
;
/*printf("get data\n");*/
/*msg_process(tmqmessage);*/
taos_free_result
(
tmqmessage
);
/*} else {*/
/*break;*/
}
}
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
...
...
@@ -253,39 +250,6 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
void
perf_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
tmq_resp_err_t
err
;
if
((
err
=
tmq_subscribe
(
tmq
,
topics
)))
{
fprintf
(
stderr
,
"%% Failed to start consuming topics: %s
\n
"
,
tmq_err2str
(
err
));
printf
(
"subscribe err
\n
"
);
return
;
}
int32_t
batchCnt
=
0
;
int32_t
skipLogNum
=
0
;
clock_t
startTime
=
clock
();
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
500
);
if
(
tmqmessage
)
{
batchCnt
++
;
/*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/
/*msg_process(tmqmessage);*/
taos_free_result
(
tmqmessage
);
}
else
{
break
;
}
}
clock_t
endTime
=
clock
();
printf
(
"log batch cnt: %d, skip log cnt: %d, time used:%f s
\n
"
,
batchCnt
,
skipLogNum
,
(
double
)(
endTime
-
startTime
)
/
CLOCKS_PER_SEC
);
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
fprintf
(
stderr
,
"%% Failed to close consumer: %s
\n
"
,
tmq_err2str
(
err
));
else
fprintf
(
stderr
,
"%% Consumer closed
\n
"
);
}
int
main
(
int
argc
,
char
*
argv
[])
{
if
(
argc
>
1
)
{
printf
(
"env init
\n
"
);
...
...
@@ -296,7 +260,6 @@ int main(int argc, char* argv[]) {
}
tmq_t
*
tmq
=
build_consumer
();
tmq_list_t
*
topic_list
=
build_topic_list
();
/*perf_loop(tmq, topic_list);*/
/*basic_consume_loop(tmq, topic_list);*/
sync_consume_loop
(
tmq
,
topic_list
);
basic_consume_loop
(
tmq
,
topic_list
);
/*sync_consume_loop(tmq, topic_list);*/
}
include/client/taos.h
浏览文件 @
648e0ac1
...
...
@@ -85,6 +85,14 @@ typedef struct taosField {
int32_t
bytes
;
}
TAOS_FIELD
;
typedef
struct
TAOS_FIELD_E
{
char
name
[
65
];
int8_t
type
;
uint8_t
precision
;
uint8_t
scale
;
int32_t
bytes
;
}
TAOS_FIELD_E
;
#ifdef WINDOWS
#define DLL_EXPORT __declspec(dllexport)
#else
...
...
@@ -134,7 +142,10 @@ DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT
int
taos_stmt_prepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
);
DLL_EXPORT
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_MULTI_BIND
*
tags
);
DLL_EXPORT
int
taos_stmt_set_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
DLL_EXPORT
int
taos_stmt_set_tags
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
tags
);
DLL_EXPORT
int
taos_stmt_set_sub_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
);
DLL_EXPORT
int
taos_stmt_get_tag_fields
(
TAOS_STMT
*
stmt
,
int
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
DLL_EXPORT
int
taos_stmt_get_col_fields
(
TAOS_STMT
*
stmt
,
int
*
fieldNum
,
TAOS_FIELD_E
**
fields
);
DLL_EXPORT
int
taos_stmt_is_insert
(
TAOS_STMT
*
stmt
,
int
*
insert
);
DLL_EXPORT
int
taos_stmt_num_params
(
TAOS_STMT
*
stmt
,
int
*
nums
);
...
...
@@ -230,7 +241,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
DLL_EXPORT
tmq_resp_err_t
tmq_subscribe
(
tmq_t
*
tmq
,
const
tmq_list_t
*
topic_list
);
DLL_EXPORT
tmq_resp_err_t
tmq_unsubscribe
(
tmq_t
*
tmq
);
DLL_EXPORT
tmq_resp_err_t
tmq_subscription
(
tmq_t
*
tmq
,
tmq_list_t
**
topics
);
DLL_EXPORT
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
wait_time
);
DLL_EXPORT
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
timeout
);
DLL_EXPORT
tmq_resp_err_t
tmq_consumer_close
(
tmq_t
*
tmq
);
DLL_EXPORT
tmq_resp_err_t
tmq_commit_sync
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
);
DLL_EXPORT
void
tmq_commit_async
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
tmq_commit_cb
*
cb
,
void
*
param
);
...
...
include/common/tmsg.h
浏览文件 @
648e0ac1
...
...
@@ -2439,7 +2439,7 @@ typedef struct {
int32_t
epoch
;
uint64_t
reqId
;
int64_t
consumerId
;
int64_t
waitTime
;
int64_t
timeout
;
int64_t
currentOffset
;
}
SMqPollReq
;
...
...
include/libs/parser/parser.h
浏览文件 @
648e0ac1
...
...
@@ -77,8 +77,8 @@ int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery);
int32_t
qBindStmtColsValue
(
void
*
pBlock
,
TAOS_MULTI_BIND
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
);
int32_t
qBindStmtSingleColValue
(
void
*
pBlock
,
TAOS_MULTI_BIND
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
,
int32_t
colIdx
,
int32_t
rowNum
);
int32_t
qBuildStmtColFields
(
void
*
pDataBlock
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
);
int32_t
qBuildStmtTagFields
(
void
*
pBlock
,
void
*
boundTags
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
);
int32_t
qBuildStmtColFields
(
void
*
pDataBlock
,
int32_t
*
fieldNum
,
TAOS_FIELD
_E
**
fields
);
int32_t
qBuildStmtTagFields
(
void
*
pBlock
,
void
*
boundTags
,
int32_t
*
fieldNum
,
TAOS_FIELD
_E
**
fields
);
int32_t
qBindStmtTagsValue
(
void
*
pBlock
,
void
*
boundTags
,
int64_t
suid
,
char
*
tName
,
TAOS_MULTI_BIND
*
bind
,
char
*
msgBuf
,
int32_t
msgBufLen
);
void
destroyBoundColumnInfo
(
void
*
pBoundInfo
);
...
...
include/os/osDir.h
浏览文件 @
648e0ac1
...
...
@@ -33,8 +33,19 @@ extern "C" {
#ifdef WINDOWS
#define TD_TMP_DIR_PATH "C:\\Windows\\Temp\\"
#define TD_CFG_DIR_PATH "C:\\TDengine\\cfg\\"
#define TD_DATA_DIR_PATH "C:\\TDengine\\data\\"
#define TD_LOG_DIR_PATH "C:\\TDengine\\log\\"
#elif defined(_TD_DARWIN_64)
#define TD_TMP_DIR_PATH "/tmp/taosd/"
#define TD_CFG_DIR_PATH "/usr/local/etc/taos/"
#define TD_DATA_DIR_PATH "/usr/local/var/lib/taos/"
#define TD_LOG_DIR_PATH "/usr/local/var/log/taos/"
#else
#define TD_TMP_DIR_PATH "/tmp/"
#define TD_CFG_DIR_PATH "/etc/taos/"
#define TD_DATA_DIR_PATH "/var/lib/taos/"
#define TD_LOG_DIR_PATH "/var/log/taos/"
#endif
typedef
struct
TdDir
*
TdDirPtr
;
...
...
include/util/taoserror.h
浏览文件 @
648e0ac1
...
...
@@ -183,7 +183,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_BNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0356)
#define TSDB_CODE_MND_BNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0357)
#define TSDB_CODE_MND_TOO_FEW_MNODES TAOS_DEF_ERROR_CODE(0, 0x0358)
#define TSDB_CODE_MND_
MNODE_DEPLOYED
TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_
TOO_MANY_MNODES
TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_CANT_DROP_MASTER TAOS_DEF_ERROR_CODE(0, 0x035A)
// mnode-acct
...
...
include/util/tdef.h
浏览文件 @
648e0ac1
...
...
@@ -253,8 +253,7 @@ typedef enum ELogicConditionType {
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 64
#define TSDB_TRANS_DESC_LEN 128
#define TSDB_TRANS_ERROR_LEN 512
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
...
...
source/client/inc/clientStmt.h
浏览文件 @
648e0ac1
...
...
@@ -116,8 +116,11 @@ int stmtAffectedRowsOnce(TAOS_STMT *stmt);
int
stmtPrepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
);
int
stmtSetTbName
(
TAOS_STMT
*
stmt
,
const
char
*
tbName
);
int
stmtSetTbTags
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
tags
);
int
stmtGetTagFields
(
TAOS_STMT
*
stmt
,
int
*
nums
,
TAOS_FIELD_E
**
fields
);
int
stmtGetColFields
(
TAOS_STMT
*
stmt
,
int
*
nums
,
TAOS_FIELD_E
**
fields
);
int
stmtIsInsert
(
TAOS_STMT
*
stmt
,
int
*
insert
);
int
stmtGetParamNum
(
TAOS_STMT
*
stmt
,
int
*
nums
);
int
stmtGetParam
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
);
int
stmtAddBatch
(
TAOS_STMT
*
stmt
);
TAOS_RES
*
stmtUseResult
(
TAOS_STMT
*
stmt
);
int
stmtBindBatch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
,
int32_t
colIdx
);
...
...
source/client/src/clientMain.c
浏览文件 @
648e0ac1
...
...
@@ -666,8 +666,39 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
return
stmtSetTbName
(
stmt
,
name
);
}
int
taos_stmt_set_tags
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
tags
)
{
if
(
stmt
==
NULL
||
tags
==
NULL
)
{
tscError
(
"NULL parameter for %s"
,
__FUNCTION__
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
return
stmtSetTbTags
(
stmt
,
tags
);
}
int
taos_stmt_set_sub_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
)
{
return
taos_stmt_set_tbname
(
stmt
,
name
);
}
int
taos_stmt_get_tag_fields
(
TAOS_STMT
*
stmt
,
int
*
fieldNum
,
TAOS_FIELD_E
**
fields
)
{
if
(
stmt
==
NULL
||
NULL
==
fieldNum
)
{
tscError
(
"NULL parameter for %s"
,
__FUNCTION__
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
return
stmtGetTagFields
(
stmt
,
fieldNum
,
fields
);
}
int
taos_stmt_get_col_fields
(
TAOS_STMT
*
stmt
,
int
*
fieldNum
,
TAOS_FIELD_E
**
fields
)
{
if
(
stmt
==
NULL
||
NULL
==
fieldNum
)
{
tscError
(
"NULL parameter for %s"
,
__FUNCTION__
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
return
stmtGetColFields
(
stmt
,
fieldNum
,
fields
);
}
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
)
{
if
(
stmt
==
NULL
||
bind
==
NULL
)
{
tscError
(
"NULL parameter for %s"
,
__FUNCTION__
);
...
...
@@ -772,6 +803,16 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
return
stmtGetParamNum
(
stmt
,
nums
);
}
int
taos_stmt_get_param
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
)
{
if
(
stmt
==
NULL
||
type
==
NULL
||
NULL
==
bytes
||
idx
<
0
)
{
tscError
(
"invalid parameter for %s"
,
__FUNCTION__
);
terrno
=
TSDB_CODE_INVALID_PARA
;
return
terrno
;
}
return
stmtGetParam
(
stmt
,
idx
,
type
,
bytes
);
}
TAOS_RES
*
taos_stmt_use_result
(
TAOS_STMT
*
stmt
)
{
if
(
stmt
==
NULL
)
{
tscError
(
"NULL parameter for %s"
,
__FUNCTION__
);
...
...
source/client/src/clientStmt.c
浏览文件 @
648e0ac1
...
...
@@ -17,7 +17,7 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
}
break
;
case
STMT_SETTAGS
:
if
(
STMT_STATUS_NE
(
SETTBNAME
))
{
if
(
STMT_STATUS_NE
(
SETTBNAME
)
&&
STMT_STATUS_NE
(
FETCH_FIELDS
)
)
{
code
=
TSDB_CODE_TSC_STMT_API_ERROR
;
}
break
;
...
...
@@ -540,6 +540,8 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
if
(
pStmt
->
bInfo
.
needParse
)
{
strncpy
(
pStmt
->
bInfo
.
tbName
,
tbName
,
sizeof
(
pStmt
->
bInfo
.
tbName
)
-
1
);
pStmt
->
bInfo
.
tbName
[
sizeof
(
pStmt
->
bInfo
.
tbName
)
-
1
]
=
0
;
STMT_ERR_RET
(
stmtParseSql
(
pStmt
));
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -550,10 +552,6 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_SETTAGS
));
if
(
pStmt
->
bInfo
.
needParse
)
{
STMT_ERR_RET
(
stmtParseSql
(
pStmt
));
}
if
(
pStmt
->
bInfo
.
inExecCache
)
{
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -571,7 +569,7 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
return
TSDB_CODE_SUCCESS
;
}
int
32_t
stmtFetchTagFields
(
STscStmt
*
pStmt
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
int
stmtFetchTagFields
(
STscStmt
*
pStmt
,
int32_t
*
fieldNum
,
TAOS_FIELD_E
**
fields
)
{
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
tscError
(
"invalid operation to get query tag fileds"
);
STMT_ERR_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
...
...
@@ -589,7 +587,7 @@ int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel
return
TSDB_CODE_SUCCESS
;
}
int
32_t
stmtFetchColFields
(
STscStmt
*
pStmt
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
int
stmtFetchColFields
(
STscStmt
*
pStmt
,
int32_t
*
fieldNum
,
TAOS_FIELD_E
**
fields
)
{
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
tscError
(
"invalid operation to get query column fileds"
);
STMT_ERR_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
...
...
@@ -852,6 +850,71 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
return
TSDB_CODE_SUCCESS
;
}
int
stmtGetTagFields
(
TAOS_STMT
*
stmt
,
int
*
nums
,
TAOS_FIELD_E
**
fields
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
STMT_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
}
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_FETCH_FIELDS
));
if
(
pStmt
->
bInfo
.
needParse
&&
pStmt
->
sql
.
runTimes
&&
pStmt
->
sql
.
type
>
0
&&
STMT_TYPE_MULTI_INSERT
!=
pStmt
->
sql
.
type
)
{
pStmt
->
bInfo
.
needParse
=
false
;
}
if
(
pStmt
->
exec
.
pRequest
&&
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
&&
pStmt
->
sql
.
runTimes
)
{
taos_free_result
(
pStmt
->
exec
.
pRequest
);
pStmt
->
exec
.
pRequest
=
NULL
;
}
if
(
NULL
==
pStmt
->
exec
.
pRequest
)
{
STMT_ERR_RET
(
buildRequest
(
pStmt
->
taos
,
pStmt
->
sql
.
sqlStr
,
pStmt
->
sql
.
sqlLen
,
&
pStmt
->
exec
.
pRequest
));
}
if
(
pStmt
->
bInfo
.
needParse
)
{
STMT_ERR_RET
(
stmtParseSql
(
pStmt
));
}
STMT_ERR_RET
(
stmtFetchTagFields
(
stmt
,
nums
,
fields
));
return
TSDB_CODE_SUCCESS
;
}
int
stmtGetColFields
(
TAOS_STMT
*
stmt
,
int
*
nums
,
TAOS_FIELD_E
**
fields
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
STMT_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
}
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_FETCH_FIELDS
));
if
(
pStmt
->
bInfo
.
needParse
&&
pStmt
->
sql
.
runTimes
&&
pStmt
->
sql
.
type
>
0
&&
STMT_TYPE_MULTI_INSERT
!=
pStmt
->
sql
.
type
)
{
pStmt
->
bInfo
.
needParse
=
false
;
}
if
(
pStmt
->
exec
.
pRequest
&&
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
&&
pStmt
->
sql
.
runTimes
)
{
taos_free_result
(
pStmt
->
exec
.
pRequest
);
pStmt
->
exec
.
pRequest
=
NULL
;
}
if
(
NULL
==
pStmt
->
exec
.
pRequest
)
{
STMT_ERR_RET
(
buildRequest
(
pStmt
->
taos
,
pStmt
->
sql
.
sqlStr
,
pStmt
->
sql
.
sqlLen
,
&
pStmt
->
exec
.
pRequest
));
}
if
(
pStmt
->
bInfo
.
needParse
)
{
STMT_ERR_RET
(
stmtParseSql
(
pStmt
));
}
STMT_ERR_RET
(
stmtFetchColFields
(
stmt
,
nums
,
fields
));
return
TSDB_CODE_SUCCESS
;
}
int
stmtGetParamNum
(
TAOS_STMT
*
stmt
,
int
*
nums
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
...
...
@@ -884,6 +947,50 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
return
TSDB_CODE_SUCCESS
;
}
int
stmtGetParam
(
TAOS_STMT
*
stmt
,
int
idx
,
int
*
type
,
int
*
bytes
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
if
(
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
)
{
STMT_RET
(
TSDB_CODE_TSC_STMT_API_ERROR
);
}
STMT_ERR_RET
(
stmtSwitchStatus
(
pStmt
,
STMT_FETCH_FIELDS
));
if
(
pStmt
->
bInfo
.
needParse
&&
pStmt
->
sql
.
runTimes
&&
pStmt
->
sql
.
type
>
0
&&
STMT_TYPE_MULTI_INSERT
!=
pStmt
->
sql
.
type
)
{
pStmt
->
bInfo
.
needParse
=
false
;
}
if
(
pStmt
->
exec
.
pRequest
&&
STMT_TYPE_QUERY
==
pStmt
->
sql
.
type
&&
pStmt
->
sql
.
runTimes
)
{
taos_free_result
(
pStmt
->
exec
.
pRequest
);
pStmt
->
exec
.
pRequest
=
NULL
;
}
if
(
NULL
==
pStmt
->
exec
.
pRequest
)
{
STMT_ERR_RET
(
buildRequest
(
pStmt
->
taos
,
pStmt
->
sql
.
sqlStr
,
pStmt
->
sql
.
sqlLen
,
&
pStmt
->
exec
.
pRequest
));
}
if
(
pStmt
->
bInfo
.
needParse
)
{
STMT_ERR_RET
(
stmtParseSql
(
pStmt
));
}
int32_t
nums
=
0
;
TAOS_FIELD_E
*
pField
=
NULL
;
STMT_ERR_RET
(
stmtFetchColFields
(
stmt
,
&
nums
,
&
pField
));
if
(
idx
>=
nums
)
{
tscError
(
"idx %d is too big"
,
idx
);
taosMemoryFree
(
pField
);
STMT_ERR_RET
(
TSDB_CODE_INVALID_PARA
);
}
*
type
=
pField
[
idx
].
type
;
*
bytes
=
pField
[
idx
].
bytes
;
taosMemoryFree
(
pField
);
return
TSDB_CODE_SUCCESS
;
}
TAOS_RES
*
stmtUseResult
(
TAOS_STMT
*
stmt
)
{
STscStmt
*
pStmt
=
(
STscStmt
*
)
stmt
;
...
...
source/client/src/tmq.c
浏览文件 @
648e0ac1
...
...
@@ -1243,7 +1243,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
return
TMQ_RESP_ERR__FAIL
;
}
SMqPollReq
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
waitTime
,
SMqClientTopic
*
pTopic
,
SMqClientVg
*
pVg
)
{
SMqPollReq
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
timeout
,
SMqClientTopic
*
pTopic
,
SMqClientVg
*
pVg
)
{
int64_t
reqOffset
;
if
(
pVg
->
currentOffset
>=
0
)
{
reqOffset
=
pVg
->
currentOffset
;
...
...
@@ -1269,7 +1269,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic*
strcpy
(
pReq
->
subKey
+
tlen
+
1
,
pTopic
->
topicName
);
pReq
->
withTbName
=
tmq
->
withTbName
;
pReq
->
waitTime
=
waitTime
;
pReq
->
timeout
=
timeout
;
pReq
->
consumerId
=
tmq
->
consumerId
;
pReq
->
epoch
=
tmq
->
epoch
;
pReq
->
currentOffset
=
reqOffset
;
...
...
@@ -1297,7 +1297,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return
pRspObj
;
}
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
waitTime
)
{
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
timeout
)
{
/*printf("call poll\n");*/
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
...
...
@@ -1318,7 +1318,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t waitTime) {
#endif
}
atomic_store_32
(
&
pVg
->
vgSkipCnt
,
0
);
SMqPollReq
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
waitTime
,
pTopic
,
pVg
);
SMqPollReq
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
timeout
,
pTopic
,
pVg
);
if
(
pReq
==
NULL
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
tsem_post
(
&
tmq
->
rspSem
);
...
...
@@ -1388,7 +1388,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
return
0
;
}
SMqRspObj
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
waitTime
,
bool
pollIfReset
)
{
SMqRspObj
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
timeout
,
bool
pollIfReset
)
{
while
(
1
)
{
SMqRspWrapper
*
rspWrapper
=
NULL
;
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rspWrapper
);
...
...
@@ -1428,17 +1428,17 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t waitTime, bool pollIfReset) {
taosFreeQitem
(
rspWrapper
);
if
(
pollIfReset
&&
reset
)
{
tscDebug
(
"consumer %ld reset and repoll"
,
tmq
->
consumerId
);
tmqPollImpl
(
tmq
,
waitTime
);
tmqPollImpl
(
tmq
,
timeout
);
}
}
}
}
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
wait_time
)
{
TAOS_RES
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
timeout
)
{
SMqRspObj
*
rspObj
;
int64_t
startTime
=
taosGetTimestampMs
();
rspObj
=
tmqHandleAllRsp
(
tmq
,
wait_time
,
false
);
rspObj
=
tmqHandleAllRsp
(
tmq
,
timeout
,
false
);
if
(
rspObj
)
{
return
(
TAOS_RES
*
)
rspObj
;
}
...
...
@@ -1450,16 +1450,16 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
while
(
1
)
{
tmqHandleAllDelayedTask
(
tmq
);
if
(
tmqPollImpl
(
tmq
,
wait_time
)
<
0
)
return
NULL
;
if
(
tmqPollImpl
(
tmq
,
timeout
)
<
0
)
return
NULL
;
rspObj
=
tmqHandleAllRsp
(
tmq
,
wait_time
,
false
);
rspObj
=
tmqHandleAllRsp
(
tmq
,
timeout
,
false
);
if
(
rspObj
)
{
return
(
TAOS_RES
*
)
rspObj
;
}
if
(
wait_time
!=
0
)
{
if
(
timeout
!=
0
)
{
int64_t
endTime
=
taosGetTimestampMs
();
int64_t
leftTime
=
endTime
-
startTime
;
if
(
leftTime
>
wait_time
)
{
if
(
leftTime
>
timeout
)
{
tscDebug
(
"consumer %ld (epoch %d) timeout, no rsp"
,
tmq
->
consumerId
,
tmq
->
epoch
);
return
NULL
;
}
...
...
@@ -1474,10 +1474,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
tmq_resp_err_t
tmq_consumer_close
(
tmq_t
*
tmq
)
{
if
(
tmq
->
status
==
TMQ_CONSUMER_STATUS__READY
)
{
tmq_resp_err_t
rsp
=
tmq_commit_sync
(
tmq
,
NULL
);
if
(
rsp
==
TMQ_RESP_ERR__SUCCESS
)
{
// TODO: free resources
return
TMQ_RESP_ERR__SUCCESS
;
}
else
{
if
(
rsp
==
TMQ_RESP_ERR__FAIL
)
{
return
TMQ_RESP_ERR__FAIL
;
}
...
...
@@ -1485,10 +1482,7 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
rsp
=
tmq_subscribe
(
tmq
,
lst
);
tmq_list_destroy
(
lst
);
if
(
rsp
==
TMQ_RESP_ERR__SUCCESS
)
{
// TODO: free resources
return
TMQ_RESP_ERR__SUCCESS
;
}
else
{
if
(
rsp
==
TMQ_RESP_ERR__FAIL
)
{
return
TMQ_RESP_ERR__FAIL
;
}
}
...
...
source/common/src/systable.c
浏览文件 @
648e0ac1
...
...
@@ -215,7 +215,6 @@ static const SSysDbTableSchema transSchema[] = {
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"stage"
,
.
bytes
=
TSDB_TRANS_STAGE_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"db"
,
.
bytes
=
SYSTABLE_SCH_DB_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"type"
,
.
bytes
=
TSDB_TRANS_TYPE_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"failed_times"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"last_exec_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"last_error"
,
.
bytes
=
(
TSDB_TRANS_ERROR_LEN
-
1
)
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
648e0ac1
...
...
@@ -54,9 +54,11 @@ typedef enum {
}
EAuthOp
;
typedef
enum
{
TRN_STEP_LOG
=
1
,
TRN_STEP_ACTION
=
2
,
}
ETrnStep
;
TRN_CONFLICT_NOTHING
=
0
,
TRN_CONFLICT_GLOBAL
=
1
,
TRN_CONFLICT_DB
=
2
,
TRN_CONFLICT_DB_INSIDE
=
3
,
}
ETrnConflct
;
typedef
enum
{
TRN_STAGE_PREPARE
=
0
,
...
...
@@ -68,69 +70,15 @@ typedef enum {
TRN_STAGE_FINISHED
=
6
}
ETrnStage
;
typedef
enum
{
TRN_TYPE_BASIC_SCOPE
=
1000
,
TRN_TYPE_CREATE_ACCT
=
1001
,
TRN_TYPE_CREATE_CLUSTER
=
1002
,
TRN_TYPE_CREATE_USER
=
1003
,
TRN_TYPE_ALTER_USER
=
1004
,
TRN_TYPE_DROP_USER
=
1005
,
TRN_TYPE_CREATE_FUNC
=
1006
,
TRN_TYPE_DROP_FUNC
=
1007
,
TRN_TYPE_CREATE_SNODE
=
1010
,
TRN_TYPE_DROP_SNODE
=
1011
,
TRN_TYPE_CREATE_QNODE
=
1012
,
TRN_TYPE_DROP_QNODE
=
10013
,
TRN_TYPE_CREATE_BNODE
=
1014
,
TRN_TYPE_DROP_BNODE
=
1015
,
TRN_TYPE_CREATE_MNODE
=
1016
,
TRN_TYPE_DROP_MNODE
=
1017
,
TRN_TYPE_CREATE_TOPIC
=
1020
,
TRN_TYPE_DROP_TOPIC
=
1021
,
TRN_TYPE_SUBSCRIBE
=
1022
,
TRN_TYPE_REBALANCE
=
1023
,
TRN_TYPE_COMMIT_OFFSET
=
1024
,
TRN_TYPE_CREATE_STREAM
=
1025
,
TRN_TYPE_DROP_STREAM
=
1026
,
TRN_TYPE_ALTER_STREAM
=
1027
,
TRN_TYPE_CONSUMER_LOST
=
1028
,
TRN_TYPE_CONSUMER_RECOVER
=
1029
,
TRN_TYPE_DROP_CGROUP
=
1030
,
TRN_TYPE_BASIC_SCOPE_END
,
TRN_TYPE_GLOBAL_SCOPE
=
2000
,
TRN_TYPE_CREATE_DNODE
=
2001
,
TRN_TYPE_DROP_DNODE
=
2002
,
TRN_TYPE_GLOBAL_SCOPE_END
,
TRN_TYPE_DB_SCOPE
=
3000
,
TRN_TYPE_CREATE_DB
=
3001
,
TRN_TYPE_ALTER_DB
=
3002
,
TRN_TYPE_DROP_DB
=
3003
,
TRN_TYPE_SPLIT_VGROUP
=
3004
,
TRN_TYPE_MERGE_VGROUP
=
3015
,
TRN_TYPE_DB_SCOPE_END
,
TRN_TYPE_STB_SCOPE
=
4000
,
TRN_TYPE_CREATE_STB
=
4001
,
TRN_TYPE_ALTER_STB
=
4002
,
TRN_TYPE_DROP_STB
=
4003
,
TRN_TYPE_CREATE_SMA
=
4004
,
TRN_TYPE_DROP_SMA
=
4005
,
TRN_TYPE_STB_SCOPE_END
,
}
ETrnType
;
typedef
enum
{
TRN_POLICY_ROLLBACK
=
0
,
TRN_POLICY_RETRY
=
1
,
}
ETrnPolicy
;
typedef
enum
{
TRN_EXEC_P
ARA
LLEL
=
0
,
TRN_EXEC_
NO_PARALLE
L
=
1
,
}
ETrnExec
Type
;
TRN_EXEC_P
RAR
LLEL
=
0
,
TRN_EXEC_
SERIA
L
=
1
,
}
ETrnExec
;
typedef
enum
{
DND_REASON_ONLINE
=
0
,
...
...
@@ -159,8 +107,8 @@ typedef struct {
int32_t
id
;
ETrnStage
stage
;
ETrnPolicy
policy
;
ETrn
Type
type
;
ETrnExec
Type
parallel
;
ETrn
Conflct
conflict
;
ETrnExec
exec
;
int32_t
code
;
int32_t
failedTimes
;
SRpcHandleInfo
rpcInfo
;
...
...
@@ -172,10 +120,11 @@ typedef struct {
SArray
*
commitActions
;
int64_t
createdTime
;
int64_t
lastExecTime
;
int64_t
dbUid
;
int32_t
lastErrorAction
;
int32_t
lastErrorNo
;
tmsg_t
lastErrorMsgType
;
SEpSet
lastErrorEpset
;
char
dbname
[
TSDB_DB_FNAME_LEN
];
char
lastError
[
TSDB_TRANS_ERROR_LEN
];
char
desc
[
TSDB_TRANS_DESC_LEN
];
int32_t
startFunc
;
int32_t
stopFunc
;
int32_t
paramLen
;
...
...
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
648e0ac1
...
...
@@ -34,7 +34,7 @@ typedef struct {
int32_t
errCode
;
int32_t
acceptableCode
;
int8_t
stage
;
int8_t
isRaw
;
int8_t
actionType
;
// 0-msg, 1-raw
int8_t
rawWritten
;
int8_t
msgSent
;
int8_t
msgReceived
;
...
...
@@ -52,7 +52,7 @@ void mndCleanupTrans(SMnode *pMnode);
STrans
*
mndAcquireTrans
(
SMnode
*
pMnode
,
int32_t
transId
);
void
mndReleaseTrans
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
ETrn
Type
type
,
const
SRpcMsg
*
pReq
);
STrans
*
mndTransCreate
(
SMnode
*
pMnode
,
ETrnPolicy
policy
,
ETrn
Conflct
conflict
,
const
SRpcMsg
*
pReq
);
void
mndTransDrop
(
STrans
*
pTrans
);
int32_t
mndTransAppendRedolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
int32_t
mndTransAppendUndolog
(
STrans
*
pTrans
,
SSdbRaw
*
pRaw
);
...
...
@@ -62,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void
mndTransSetRpcRsp
(
STrans
*
pTrans
,
void
*
pCont
,
int32_t
contLen
);
void
mndTransSetCb
(
STrans
*
pTrans
,
ETrnFunc
startFunc
,
ETrnFunc
stopFunc
,
void
*
param
,
int32_t
paramLen
);
void
mndTransSetDbInfo
(
STrans
*
pTrans
,
SDbObj
*
pDb
);
void
mndTransSet
NoParalle
l
(
STrans
*
pTrans
);
void
mndTransSet
Seria
l
(
STrans
*
pTrans
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
void
mndTransProcessRsp
(
SRpcMsg
*
pRsp
);
...
...
source/dnode/mnode/impl/src/mndAcct.c
浏览文件 @
648e0ac1
...
...
@@ -80,7 +80,7 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
mDebug
(
"acct:%s, will be created when deploying, raw:%p"
,
acctObj
.
acct
,
pRaw
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_CREATE_ACCT
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"acct:%s, failed to create since %s"
,
acctObj
.
acct
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndBnode.c
浏览文件 @
648e0ac1
...
...
@@ -246,7 +246,7 @@ static int32_t mndCreateBnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
bnodeObj
.
createdTime
=
taosGetTimestampMs
();
bnodeObj
.
updateTime
=
bnodeObj
.
createdTime
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_BNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create bnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
...
...
@@ -363,7 +363,7 @@ static int32_t mndSetDropBnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SBn
static
int32_t
mndDropBnode
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SBnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_DROP_BNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop bnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
...
...
source/dnode/mnode/impl/src/mndCluster.c
浏览文件 @
648e0ac1
...
...
@@ -179,10 +179,8 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mDebug
(
"cluster:%"
PRId64
", will be created when deploying, raw:%p"
,
clusterObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_CLUSTER
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_NOTHING
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"cluster:%"
PRId64
", failed to create since %s"
,
clusterObj
.
id
,
terrstr
());
return
-
1
;
...
...
@@ -204,7 +202,6 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
int32_t
mndRetrieveClusters
(
SRpcMsg
*
pMsg
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
)
{
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
648e0ac1
...
...
@@ -97,7 +97,7 @@ static int32_t mndProcessConsumerLostMsg(SRpcMsg *pMsg) {
mndReleaseConsumer
(
pMnode
,
pConsumer
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_CONSUMER_LOST
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
goto
FAIL
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
FAIL
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
FAIL
;
...
...
@@ -121,7 +121,7 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) {
mndReleaseConsumer
(
pMnode
,
pConsumer
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_CONSUMER_RECOVER
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
goto
FAIL
;
if
(
mndSetConsumerCommitLogs
(
pMnode
,
pTrans
,
pConsumerNew
)
!=
0
)
goto
FAIL
;
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
goto
FAIL
;
...
...
@@ -403,7 +403,7 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
int32_t
newTopicNum
=
taosArrayGetSize
(
newSub
);
// check topic existance
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_SUBSCRIBE
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
goto
SUBSCRIBE_OVER
;
for
(
int32_t
i
=
0
;
i
<
newTopicNum
;
i
++
)
{
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
648e0ac1
...
...
@@ -545,7 +545,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
}
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE
_DB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT
_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create db:%s"
,
pTrans
->
id
,
pCreate
->
db
);
...
...
@@ -775,7 +775,7 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static
int32_t
mndAlterDb
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pOld
,
SDbObj
*
pNew
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_ALTER
_DB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT
_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to alter db:%s"
,
pTrans
->
id
,
pOld
->
name
);
...
...
@@ -1036,7 +1036,7 @@ static int32_t mndBuildDropDbRsp(SDbObj *pDb, int32_t *pRspLen, void **ppRsp, bo
static
int32_t
mndDropDb
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_DROP
_DB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT
_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop db:%s"
,
pTrans
->
id
,
pDb
->
name
);
...
...
source/dnode/mnode/impl/src/mndDnode.c
浏览文件 @
648e0ac1
...
...
@@ -101,10 +101,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
mDebug
(
"dnode:%d, will be created when deploying, raw:%p"
,
dnodeObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_DNODE
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"dnode:%s, failed to create since %s"
,
dnodeObj
.
ep
,
terrstr
());
return
-
1
;
...
...
@@ -126,7 +123,6 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
SSdbRaw
*
mndDnodeActionEncode
(
SDnodeObj
*
pDnode
)
{
...
...
@@ -260,7 +256,7 @@ int32_t mndGetDnodeSize(SMnode *pMnode) {
bool
mndIsDnodeOnline
(
SMnode
*
pMnode
,
SDnodeObj
*
pDnode
,
int64_t
curMs
)
{
int64_t
interval
=
TABS
(
pDnode
->
lastAccessTime
-
curMs
);
if
(
interval
>
30
000
*
tsStatusInterval
)
{
if
(
interval
>
5
000
*
tsStatusInterval
)
{
if
(
pDnode
->
rebootTime
>
0
)
{
pDnode
->
offlineReason
=
DND_REASON_STATUS_MSG_TIMEOUT
;
}
...
...
@@ -488,7 +484,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SRpcMsg *pReq, SCreateDnodeReq *pC
memcpy
(
dnodeObj
.
fqdn
,
pCreate
->
fqdn
,
TSDB_FQDN_LEN
);
snprintf
(
dnodeObj
.
ep
,
TSDB_EP_LEN
,
"%s:%u"
,
dnodeObj
.
fqdn
,
dnodeObj
.
port
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_DNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_GLOBAL
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"dnode:%s, failed to create since %s"
,
dnodeObj
.
ep
,
terrstr
());
return
-
1
;
...
...
@@ -564,7 +560,7 @@ CREATE_DNODE_OVER:
}
static
int32_t
mndDropDnode
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDnodeObj
*
pDnode
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_DNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_GLOBAL
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"dnode:%d, failed to drop since %s"
,
pDnode
->
id
,
terrstr
());
return
-
1
;
...
...
@@ -617,7 +613,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
pMObj
=
mndAcquireMnode
(
pMnode
,
dropReq
.
dnodeId
);
if
(
pMObj
!=
NULL
)
{
terrno
=
TSDB_CODE_MND_MNODE_
DEPLOYED
;
terrno
=
TSDB_CODE_MND_MNODE_
NOT_EXIST
;
goto
DROP_DNODE_OVER
;
}
...
...
source/dnode/mnode/impl/src/mndFunc.c
浏览文件 @
648e0ac1
...
...
@@ -215,7 +215,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCre
}
memcpy
(
func
.
pCode
,
pCreate
->
pCode
,
func
.
codeSize
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_FUNC
,
pReq
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create func:%s"
,
pTrans
->
id
,
pCreate
->
name
);
...
...
@@ -245,7 +245,7 @@ _OVER:
static
int32_t
mndDropFunc
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SFuncObj
*
pFunc
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_FUNC
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop user:%s"
,
pTrans
->
id
,
pFunc
->
name
);
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
648e0ac1
...
...
@@ -369,7 +369,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
mError
(
"failed to process sync msg:%p type:%s since %s"
,
pMsg
,
TMSG_INFO
(
pMsg
->
msgType
),
terrstr
());
return
TAOS_SYNC_PROPOSE_OTHER_ERROR
;
}
char
logBuf
[
512
]
=
{
0
};
char
*
syncNodeStr
=
sync2SimpleStr
(
pMgmt
->
sync
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==vnodeProcessSyncReq== msgType:%d, syncNode: %s"
,
pMsg
->
msgType
,
syncNodeStr
);
...
...
@@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
}
else
if
(
code
==
0
)
{
mTrace
(
"msg:%p, successfully processed and response"
,
pMsg
);
}
else
{
m
Debug
(
"msg:%p, failed to process since %s, app:%p type:%s"
,
pMsg
,
terrstr
(),
pMsg
->
info
.
ahandle
,
m
Error
(
"msg:%p, failed to process since %s, app:%p type:%s"
,
pMsg
,
terrstr
(),
pMsg
->
info
.
ahandle
,
TMSG_INFO
(
pMsg
->
msgType
));
}
...
...
@@ -686,4 +686,4 @@ void mndReleaseSyncRef(SMnode *pMnode) {
int32_t
ref
=
atomic_sub_fetch_32
(
&
pMnode
->
syncRef
,
1
);
mTrace
(
"mnode sync is released, ref:%d"
,
ref
);
taosThreadRwlockUnlock
(
&
pMnode
->
lock
);
}
\ No newline at end of file
}
source/dnode/mnode/impl/src/mndMnode.c
浏览文件 @
648e0ac1
...
...
@@ -18,9 +18,9 @@
#include "mndAuth.h"
#include "mndDnode.h"
#include "mndShow.h"
#include "mndSync.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndSync.h"
#define MNODE_VER_NUMBER 1
#define MNODE_RESERVE_SIZE 64
...
...
@@ -92,10 +92,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
mDebug
(
"mnode:%d, will be created when deploying, raw:%p"
,
mnodeObj
.
id
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_DNODE
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_GLOBAL
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"mnode:%d, failed to create since %s"
,
mnodeObj
.
id
,
terrstr
());
return
-
1
;
...
...
@@ -117,7 +114,6 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
SSdbRaw
*
mndMnodeActionEncode
(
SMnodeObj
*
pObj
)
{
...
...
@@ -363,11 +359,11 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
mnodeObj
.
createdTime
=
taosGetTimestampMs
();
mnodeObj
.
updateTime
=
mnodeObj
.
createdTime
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_CREATE_MNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_GLOBAL
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create mnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
mndTransSet
NoParalle
l
(
pTrans
);
mndTransSet
Seria
l
(
pTrans
);
if
(
mndSetCreateMnodeRedoLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeCommitLogs
(
pMnode
,
pTrans
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateMnodeRedoActions
(
pMnode
,
pTrans
,
pDnode
,
&
mnodeObj
)
!=
0
)
goto
_OVER
;
...
...
@@ -396,6 +392,11 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
mDebug
(
"mnode:%d, start to create"
,
createReq
.
dnodeId
);
if
(
sdbGetSize
(
pMnode
->
pSdb
,
SDB_MNODE
)
>=
3
)
{
terrno
=
TSDB_CODE_MND_TOO_MANY_MNODES
;
goto
_OVER
;
}
pObj
=
mndAcquireMnode
(
pMnode
,
createReq
.
dnodeId
);
if
(
pObj
!=
NULL
)
{
terrno
=
TSDB_CODE_MND_MNODE_ALREADY_EXIST
;
...
...
@@ -535,11 +536,11 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
static
int32_t
mndDropMnode
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SMnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_DROP_MNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_GLOBAL
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop mnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
mndTransSet
NoParalle
l
(
pTrans
);
mndTransSet
Seria
l
(
pTrans
);
if
(
mndSetDropMnodeRedoLogs
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropMnodeCommitLogs
(
pMnode
,
pTrans
,
pObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetDropMnodeRedoActions
(
pMnode
,
pTrans
,
pObj
->
pDnode
,
pObj
)
!=
0
)
goto
_OVER
;
...
...
@@ -632,6 +633,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t
cols
=
0
;
SMnodeObj
*
pObj
=
NULL
;
char
*
pWrite
;
int64_t
curMs
=
taosGetTimestampMs
();
while
(
numOfRows
<
rows
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_MNODE
,
pShow
->
pIter
,
(
void
**
)
&
pObj
);
...
...
@@ -647,11 +649,16 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
b1
,
false
);
bool
online
=
mndIsDnodeOnline
(
pMnode
,
pObj
->
pDnode
,
curMs
);
const
char
*
roles
=
NULL
;
if
(
pObj
->
id
==
pMnode
->
selfDnodeId
)
{
roles
=
syncStr
(
TAOS_SYNC_STATE_LEADER
);
}
else
{
roles
=
syncStr
(
pObj
->
state
);
if
(
!
online
)
{
roles
=
"OFFLINE"
;
}
else
{
roles
=
syncStr
(
pObj
->
state
);
}
}
char
*
b2
=
taosMemoryCalloc
(
1
,
12
+
VARSTR_HEADER_SIZE
);
STR_WITH_MAXSIZE_TO_VARSTR
(
b2
,
roles
,
pShow
->
pMeta
->
pSchemas
[
cols
].
bytes
);
...
...
source/dnode/mnode/impl/src/mndOffset.c
浏览文件 @
648e0ac1
...
...
@@ -179,7 +179,7 @@ static int32_t mndProcessCommitOffsetReq(SRpcMsg *pMsg) {
tDecodeSMqCMCommitOffsetReq
(
&
decoder
,
&
commitOffsetReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_COMMIT_OFFSET
,
pMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pMsg
);
for
(
int32_t
i
=
0
;
i
<
commitOffsetReq
.
num
;
i
++
)
{
SMqOffset
*
pOffset
=
&
commitOffsetReq
.
offsets
[
i
];
...
...
source/dnode/mnode/impl/src/mndQnode.c
浏览文件 @
648e0ac1
...
...
@@ -248,7 +248,7 @@ static int32_t mndCreateQnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
qnodeObj
.
createdTime
=
taosGetTimestampMs
();
qnodeObj
.
updateTime
=
qnodeObj
.
createdTime
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_QNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create qnode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
...
...
@@ -365,7 +365,7 @@ static int32_t mndSetDropQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQn
static
int32_t
mndDropQnode
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SQnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_DROP_QNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop qnode:%d"
,
pTrans
->
id
,
pObj
->
id
);
...
...
source/dnode/mnode/impl/src/mndSma.c
浏览文件 @
648e0ac1
...
...
@@ -508,12 +508,12 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
streamObj
.
fixedSinkVgId
=
smaObj
.
dstVgId
;
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_CREATE_SMA
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create sma:%s"
,
pTrans
->
id
,
pCreate
->
name
);
mndTransSetDbInfo
(
pTrans
,
pDb
);
mndTransSet
NoParalle
l
(
pTrans
);
mndTransSet
Seria
l
(
pTrans
);
if
(
mndSetCreateSmaRedoLogs
(
pMnode
,
pTrans
,
&
smaObj
)
!=
0
)
goto
_OVER
;
if
(
mndSetCreateSmaVgroupRedoLogs
(
pMnode
,
pTrans
,
&
streamObj
.
fixedSinkVg
)
!=
0
)
goto
_OVER
;
...
...
@@ -753,7 +753,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
pVgroup
=
mndAcquireVgroup
(
pMnode
,
pSma
->
dstVgId
);
if
(
pVgroup
==
NULL
)
goto
_OVER
;
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_SMA
,
pReq
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_DB
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop sma:%s"
,
pTrans
->
id
,
pSma
->
name
);
...
...
source/dnode/mnode/impl/src/mndSnode.c
浏览文件 @
648e0ac1
...
...
@@ -253,7 +253,7 @@ static int32_t mndCreateSnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
snodeObj
.
createdTime
=
taosGetTimestampMs
();
snodeObj
.
updateTime
=
snodeObj
.
createdTime
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_SNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create snode:%d"
,
pTrans
->
id
,
pCreate
->
dnodeId
);
...
...
@@ -372,7 +372,7 @@ static int32_t mndSetDropSnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SSn
static
int32_t
mndDropSnode
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SSnodeObj
*
pObj
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_DROP_SNODE
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop snode:%d"
,
pTrans
->
id
,
pObj
->
id
);
...
...
source/dnode/mnode/impl/src/mndStb.c
浏览文件 @
648e0ac1
...
...
@@ -735,7 +735,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SRpcMsg *pReq, SMCreateStbReq *pCrea
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_STB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_DB_INSIDE
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to create stb:%s"
,
pTrans
->
id
,
pCreate
->
name
);
...
...
@@ -1257,7 +1257,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
if
(
code
!=
0
)
goto
_OVER
;
code
=
-
1
;
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
TYPE_ALTER_STB
,
pReq
);
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_
CONFLICT_DB_INSIDE
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to alter stb:%s"
,
pTrans
->
id
,
pAlter
->
name
);
...
...
@@ -1403,7 +1403,7 @@ static int32_t mndSetDropStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
static
int32_t
mndDropStb
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SDbObj
*
pDb
,
SStbObj
*
pStb
)
{
int32_t
code
=
-
1
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_STB
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_DB_INSIDE
,
pReq
);
if
(
pTrans
==
NULL
)
goto
_OVER
;
mDebug
(
"trans:%d, used to drop stb:%s"
,
pTrans
->
id
,
pStb
->
name
);
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
648e0ac1
...
...
@@ -402,7 +402,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SRpcMsg *pReq, SCMCreateStreamReq
tstrncpy
(
streamObj
.
targetDb
,
pDb
->
name
,
TSDB_DB_FNAME_LEN
);
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_STREAM
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"stream:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
648e0ac1
...
...
@@ -394,8 +394,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
mInfo
(
"rebalance calculation completed, rebalanced vg:"
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pOutput
->
rebVgs
);
i
++
)
{
SMqRebOutputVg
*
pOutputRebVg
=
taosArrayGet
(
pOutput
->
rebVgs
,
i
);
mInfo
(
"vg
: %d moved from consumer %ld to consumer %ld"
,
pOutputRebVg
->
pVgEp
->
vgId
,
pOutputRebVg
->
oldConsumer
Id
,
pOutputRebVg
->
newConsumerId
);
mInfo
(
"vg
Id:%d moved from consumer %"
PRId64
" to consumer %"
PRId64
,
pOutputRebVg
->
pVgEp
->
vg
Id
,
pOutputRebVg
->
oldConsumerId
,
pOutputRebVg
->
newConsumerId
);
}
// 9. clear
...
...
@@ -405,10 +405,9 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
}
static
int32_t
mndPersistRebResult
(
SMnode
*
pMnode
,
SRpcMsg
*
pMsg
,
const
SMqRebOutputObj
*
pOutput
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_REBALANCE
,
pMsg
);
if
(
pTrans
==
NULL
)
{
return
-
1
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_CONFLICT_NOTHING
,
pMsg
);
if
(
pTrans
==
NULL
)
return
-
1
;
// make txn:
// 1. redo action: action to all vg
const
SArray
*
rebVgs
=
pOutput
->
rebVgs
;
...
...
@@ -625,7 +624,7 @@ static int32_t mndProcessDropCgroupReq(SRpcMsg *pReq) {
return
-
1
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_CGROUP
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"cgroup: %s on topic:%s, failed to drop since %s"
,
dropReq
.
cgroup
,
dropReq
.
topic
,
terrstr
());
mndReleaseSubscribe
(
pMnode
,
pSub
);
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
648e0ac1
...
...
@@ -383,7 +383,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
/*topicObj.withSchema = 1;*/
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_TOPIC
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
taosMemoryFreeClear
(
topicObj
.
ast
);
...
...
@@ -551,7 +551,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
}
#endif
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_TOPIC
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to drop since %s"
,
pTopic
->
name
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
648e0ac1
此差异已折叠。
点击以展开。
source/dnode/mnode/impl/src/mndUser.c
浏览文件 @
648e0ac1
...
...
@@ -79,10 +79,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
mDebug
(
"user:%s, will be created when deploying, raw:%p"
,
userObj
.
user
,
pRaw
);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_TYPE_CREATE_USER
,
NULL
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
TRN_CONFLICT_NOTHING
,
NULL
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to create since %s"
,
userObj
.
user
,
terrstr
());
return
-
1
;
...
...
@@ -104,7 +101,6 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
mndTransDrop
(
pTrans
);
return
0
;
#endif
}
static
int32_t
mndCreateDefaultUsers
(
SMnode
*
pMnode
)
{
...
...
@@ -291,7 +287,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
userObj
.
updateTime
=
userObj
.
createdTime
;
userObj
.
superUser
=
pCreate
->
superUser
;
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_USER
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to create since %s"
,
pCreate
->
user
,
terrstr
());
return
-
1
;
...
...
@@ -371,7 +367,7 @@ _OVER:
}
static
int32_t
mndAlterUser
(
SMnode
*
pMnode
,
SUserObj
*
pOld
,
SUserObj
*
pNew
,
SRpcMsg
*
pReq
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_ALTER_USER
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to alter since %s"
,
pOld
->
user
,
terrstr
());
return
-
1
;
...
...
@@ -578,7 +574,7 @@ _OVER:
}
static
int32_t
mndDropUser
(
SMnode
*
pMnode
,
SRpcMsg
*
pReq
,
SUserObj
*
pUser
)
{
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_DROP_USER
,
pReq
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
CONFLICT_NOTHING
,
pReq
);
if
(
pTrans
==
NULL
)
{
mError
(
"user:%s, failed to drop since %s"
,
pUser
->
user
,
terrstr
());
return
-
1
;
...
...
source/dnode/mnode/impl/test/trans/trans2.cpp
浏览文件 @
648e0ac1
...
...
@@ -11,6 +11,8 @@
#include <gtest/gtest.h>
#if 0
#include "mndTrans.h"
#include "mndUser.h"
#include "tcache.h"
...
...
@@ -103,7 +105,7 @@ class MndTestTrans2 : public ::testing::Test {
void SetUp() override {}
void TearDown() override {}
int32_t
CreateUserLog
(
const
char
*
acct
,
const
char
*
user
,
ETrn
Type
type
,
SDbObj
*
pDb
)
{
int32_t CreateUserLog(const char *acct, const char *user, ETrn
Conflct conflict
, SDbObj *pDb) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN);
...
...
@@ -113,7 +115,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
type
,
&
rpcMsg
);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,
conflict
, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
...
...
@@ -135,7 +137,7 @@ class MndTestTrans2 : public ::testing::Test {
return code;
}
int32_t
CreateUserAction
(
const
char
*
acct
,
const
char
*
user
,
bool
hasUndoAction
,
ETrnPolicy
policy
,
ETrn
Type
type
,
int32_t CreateUserAction(const char *acct, const char *user, bool hasUndoAction, ETrnPolicy policy, ETrn
Conflct conflict
,
SDbObj *pDb) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
...
...
@@ -146,7 +148,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
policy
,
type
,
&
rpcMsg
);
STrans *pTrans = mndTransCreate(pMnode, policy,
conflict
, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
...
...
@@ -218,7 +220,7 @@ class MndTestTrans2 : public ::testing::Test {
userObj.superUser = 1;
SRpcMsg rpcMsg = {0};
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_
TYPE_CREATE_USER
,
&
rpcMsg
);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_
CONFLICT_NOTHING
, &rpcMsg);
SSdbRaw *pRedoRaw = mndUserActionEncode(&userObj);
mndTransAppendRedolog(pTrans, pRedoRaw);
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
...
...
@@ -528,3 +530,5 @@ TEST_F(MndTestTrans2, 04_Conflict) {
mndReleaseUser(pMnode, pUser);
}
}
#endif
\ No newline at end of file
source/dnode/mnode/impl/test/user/CMakeLists.txt
浏览文件 @
648e0ac1
...
...
@@ -5,7 +5,9 @@ target_link_libraries(
PUBLIC sut
)
add_test
(
NAME userTest
COMMAND userTest
)
if
(
NOT TD_WINDOWS
)
add_test
(
NAME userTest
COMMAND userTest
)
endif
(
NOT TD_WINDOWS
)
source/dnode/vnode/CMakeLists.txt
浏览文件 @
648e0ac1
...
...
@@ -52,10 +52,11 @@ target_sources(
# tq
"src/tq/tq.c"
"src/tq/tqExec.c"
"src/tq/tqCommit.c"
"src/tq/tqOffset.c"
"src/tq/tqPush.c"
"src/tq/tqMeta.c"
"src/tq/tqRead.c"
"src/tq/tqOffset.c"
#"src/tq/tqPush.c"
#"src/tq/tqCommit.c"
)
target_include_directories
(
vnode
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
648e0ac1
...
...
@@ -66,12 +66,12 @@ struct STqReadHandle {
// tqPush
typedef
struct
{
int64_t
consumerId
;
int32_t
epoch
;
int32_t
skipLogNum
;
int64_t
reqOffset
;
SR
WLatch
lock
;
SR
pcMsg
*
handle
;
int64_t
consumerId
;
int32_t
epoch
;
int32_t
skipLogNum
;
int64_t
reqOffset
;
SR
pcHandleInfo
info
;
SR
WLatch
lock
;
}
STqPushHandle
;
#if 0
...
...
@@ -168,6 +168,13 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
int32_t
tqDataExec
(
STQ
*
pTq
,
STqExecHandle
*
pExec
,
SSubmitReq
*
pReq
,
SMqDataBlkRsp
*
pRsp
,
int32_t
workerId
);
// tqMeta
int32_t
tqMetaOpen
(
STQ
*
pTq
);
int32_t
tqMetaClose
(
STQ
*
pTq
);
int32_t
tqMetaSaveHandle
(
STQ
*
pTq
,
const
char
*
key
,
const
STqHandle
*
pHandle
);
int32_t
tqMetaDeleteHandle
(
STQ
*
pTq
,
const
char
*
key
);
// tqOffset
STqOffsetStore
*
STqOffsetOpen
(
STqOffsetCfg
*
);
void
STqOffsetClose
(
STqOffsetStore
*
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
648e0ac1
...
...
@@ -14,7 +14,6 @@
*/
#include "tq.h"
#include "tdbInt.h"
int32_t
tqInit
()
{
int8_t
old
;
...
...
@@ -47,51 +46,6 @@ void tqCleanUp() {
}
}
int
tqExecKeyCompare
(
const
void
*
pKey1
,
int32_t
kLen1
,
const
void
*
pKey2
,
int32_t
kLen2
)
{
return
strcmp
(
pKey1
,
pKey2
);
}
int32_t
tqStoreHandle
(
STQ
*
pTq
,
const
char
*
key
,
const
STqHandle
*
pHandle
)
{
int32_t
code
;
int32_t
vlen
;
tEncodeSize
(
tEncodeSTqHandle
,
pHandle
,
vlen
,
code
);
ASSERT
(
code
==
0
);
void
*
buf
=
taosMemoryCalloc
(
1
,
vlen
);
if
(
buf
==
NULL
)
{
ASSERT
(
0
);
}
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
buf
,
vlen
);
if
(
tEncodeSTqHandle
(
&
encoder
,
pHandle
)
<
0
)
{
ASSERT
(
0
);
}
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbBegin
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbUpsert
(
pTq
->
pExecStore
,
key
,
(
int
)
strlen
(
key
),
buf
,
vlen
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
tEncoderClear
(
&
encoder
);
taosMemoryFree
(
buf
);
return
0
;
}
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
)
{
STQ
*
pTq
=
taosMemoryMalloc
(
sizeof
(
STQ
));
if
(
pTq
==
NULL
)
{
...
...
@@ -108,60 +62,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq
->
pushMgr
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
tdbOpen
(
path
,
16
*
1024
,
1
,
&
pTq
->
pMetaStore
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbOpen
(
"handles"
,
-
1
,
-
1
,
tqExecKeyCompare
,
pTq
->
pMetaStore
,
&
pTq
->
pExecStore
)
<
0
)
{
ASSERT
(
0
);
}
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
0
)
<
0
)
{
ASSERT
(
0
);
}
TBC
*
pCur
;
if
(
tdbTbcOpen
(
pTq
->
pExecStore
,
&
pCur
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
void
*
pKey
;
int
kLen
;
void
*
pVal
;
int
vLen
;
tdbTbcMoveToFirst
(
pCur
);
SDecoder
decoder
;
while
(
tdbTbcNext
(
pCur
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
)
==
0
)
{
STqHandle
handle
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
pVal
,
vLen
);
tDecodeSTqHandle
(
&
decoder
,
&
handle
);
handle
.
pWalReader
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
handle
.
execHandle
.
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
}
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
reader
=
{
.
reader
=
handle
.
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
handle
.
execHandle
.
exec
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
handle
.
execHandle
.
exec
.
execCol
.
qmsg
,
&
reader
);
ASSERT
(
handle
.
execHandle
.
exec
.
execCol
.
task
[
i
]);
}
}
else
{
handle
.
execHandle
.
exec
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
taosHashPut
(
pTq
->
handles
,
pKey
,
kLen
,
&
handle
,
sizeof
(
STqHandle
));
}
if
(
tdbTxnClose
(
&
txn
)
<
0
)
{
if
(
tqMetaOpen
(
pTq
)
<
0
)
{
ASSERT
(
0
);
}
...
...
@@ -174,46 +75,12 @@ void tqClose(STQ* pTq) {
taosHashCleanup
(
pTq
->
handles
);
taosHashCleanup
(
pTq
->
pStreamTasks
);
taosHashCleanup
(
pTq
->
pushMgr
);
t
dbClose
(
pTq
->
pMetaStore
);
t
qMetaClose
(
pTq
);
taosMemoryFree
(
pTq
);
}
// TODO
}
#if 0
int32_t tEncodeSTqExec(SEncoder* pEncoder, const STqExec* pExec) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeCStr(pEncoder, pExec->subKey) < 0) return -1;
if (tEncodeI64(pEncoder, pExec->consumerId) < 0) return -1;
if (tEncodeI32(pEncoder, pExec->epoch) < 0) return -1;
if (tEncodeI8(pEncoder, pExec->subType) < 0) return -1;
/*if (tEncodeI8(pEncoder, pExec->withTbName) < 0) return -1;*/
/*if (tEncodeI8(pEncoder, pExec->withSchema) < 0) return -1;*/
/*if (tEncodeI8(pEncoder, pExec->withTag) < 0) return -1;*/
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pEncoder, pExec->qmsg) < 0) return -1;
}
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeSTqExec(SDecoder* pDecoder, STqExec* pExec) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pExec->subKey) < 0) return -1;
if (tDecodeI64(pDecoder, &pExec->consumerId) < 0) return -1;
if (tDecodeI32(pDecoder, &pExec->epoch) < 0) return -1;
if (tDecodeI8(pDecoder, &pExec->subType) < 0) return -1;
/*if (tDecodeI8(pDecoder, &pExec->withTbName) < 0) return -1;*/
/*if (tDecodeI8(pDecoder, &pExec->withSchema) < 0) return -1;*/
/*if (tDecodeI8(pDecoder, &pExec->withTag) < 0) return -1;*/
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStrAlloc(pDecoder, &pExec->qmsg) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
}
#endif
int32_t
tEncodeSTqHandle
(
SEncoder
*
pEncoder
,
const
STqHandle
*
pHandle
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pHandle
->
subKey
)
<
0
)
return
-
1
;
...
...
@@ -290,9 +157,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
taosWLockLatch
(
&
pHandle
->
pushHandle
.
lock
);
SRpcMsg
*
pMsg
=
atomic_load_ptr
(
&
pHandle
->
pushHandle
.
handle
);
ASSERT
(
pMsg
);
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pHandle
->
pushHandle
.
reqOffset
;
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
...
...
@@ -318,7 +182,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
int32_t
tlen
=
sizeof
(
SMqRspHead
)
+
tEncodeSMqDataBlkRsp
(
NULL
,
&
rsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
pMsg
->
code
=
-
1
;
// todo free
return
-
1
;
}
...
...
@@ -329,10 +193,15 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
tEncodeSMqDataBlkRsp
(
&
abuf
,
&
rsp
);
SRpcMsg
resp
=
{.
info
=
handleInfo
,
.
pCont
=
buf
,
.
contLen
=
tlen
,
.
code
=
0
};
SRpcMsg
resp
=
{
.
info
=
pHandle
->
pushHandle
.
info
,
.
pCont
=
buf
,
.
contLen
=
tlen
,
.
code
=
0
,
};
tmsgSendRsp
(
&
resp
);
atomic_store_ptr
(
&
pHandle
->
pushHandle
.
handle
,
NULL
);
memset
(
&
pHandle
->
pushHandle
.
info
,
0
,
sizeof
(
SRpcHandleInfo
)
);
taosWUnLockLatch
(
&
pHandle
->
pushHandle
.
lock
);
tqDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld"
,
...
...
@@ -374,7 +243,7 @@ int tqCommit(STQ* pTq) {
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
)
{
SMqPollReq
*
pReq
=
pMsg
->
pCont
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
waitTime
=
pReq
->
waitTime
;
int64_t
waitTime
=
pReq
->
timeout
;
int32_t
reqEpoch
=
pReq
->
epoch
;
int64_t
fetchOffset
;
...
...
@@ -410,24 +279,22 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
rsp
.
blockSchema
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
blockTbName
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
withTbName
=
pReq
->
withTbName
;
if
(
rsp
.
withTbName
)
{
rsp
.
blockTbName
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
}
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
rsp
.
withSchema
=
false
;
rsp
.
withTag
=
false
;
}
else
{
rsp
.
withSchema
=
true
;
rsp
.
blockSchema
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
rsp
.
withTag
=
false
;
}
/*int8_t withTbName = pExec->withTbName;*/
/*if (pReq->withTbName != -1) {*/
/*withTbName = pReq->withTbName;*/
/*}*/
/*rsp.withTbName = withTbName;*/
while
(
1
)
{
consumerEpoch
=
atomic_load_32
(
&
pHandle
->
epoch
);
if
(
consumerEpoch
>
reqEpoch
)
{
...
...
@@ -443,15 +310,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SWalReadHead
*
pHead
=
&
pHeadWithCkSum
->
head
;
#if 0
SWalReadHead* pHead;
if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), fetchOffset);
#if 0
// add to pushMgr
taosWLockLatch(&pExec->pushHandle.lock);
...
...
@@ -473,10 +331,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
#endif
break;
}
#endif
tqDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pHead
->
msgType
);
...
...
@@ -533,8 +387,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO wrap in destroy func
taosArrayDestroy
(
rsp
.
blockData
);
taosArrayDestroy
(
rsp
.
blockDataLen
);
taosArrayDestroyP
(
rsp
.
blockSchema
,
(
FDelete
)
tDeleteSSchemaWrapper
);
taosArrayDestroyP
(
rsp
.
blockTbName
,
(
FDelete
)
taosMemoryFree
);
if
(
rsp
.
withSchema
)
{
taosArrayDestroyP
(
rsp
.
blockSchema
,
(
FDelete
)
tDeleteSSchemaWrapper
);
}
if
(
rsp
.
withTbName
)
{
taosArrayDestroyP
(
rsp
.
blockTbName
,
(
FDelete
)
taosMemoryFree
);
}
return
0
;
}
...
...
@@ -545,24 +405,9 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t
code
=
taosHashRemove
(
pTq
->
handles
,
pReq
->
subKey
,
strlen
(
pReq
->
subKey
));
ASSERT
(
code
==
0
);
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbBegin
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
if
(
tqMetaDeleteHandle
(
pTq
,
pReq
->
subKey
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbDelete
(
pTq
->
pExecStore
,
pReq
->
subKey
,
(
int
)
strlen
(
pReq
->
subKey
),
&
txn
)
<
0
)
{
/*ASSERT(0);*/
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
return
0
;
}
...
...
@@ -620,7 +465,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
atomic_add_fetch_32
(
&
pHandle
->
epoch
,
1
);
}
if
(
tq
Stor
eHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
if
(
tq
MetaSav
eHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
// TODO
}
return
0
;
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
648e0ac1
...
...
@@ -12,3 +12,137 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
#include "tq.h"
int
tqExecKeyCompare
(
const
void
*
pKey1
,
int32_t
kLen1
,
const
void
*
pKey2
,
int32_t
kLen2
)
{
return
strcmp
(
pKey1
,
pKey2
);
}
int32_t
tqMetaOpen
(
STQ
*
pTq
)
{
if
(
tdbOpen
(
pTq
->
path
,
16
*
1024
,
1
,
&
pTq
->
pMetaStore
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbOpen
(
"handles"
,
-
1
,
-
1
,
tqExecKeyCompare
,
pTq
->
pMetaStore
,
&
pTq
->
pExecStore
)
<
0
)
{
ASSERT
(
0
);
}
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
0
)
<
0
)
{
ASSERT
(
0
);
}
TBC
*
pCur
;
if
(
tdbTbcOpen
(
pTq
->
pExecStore
,
&
pCur
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
void
*
pKey
;
int
kLen
;
void
*
pVal
;
int
vLen
;
tdbTbcMoveToFirst
(
pCur
);
SDecoder
decoder
;
while
(
tdbTbcNext
(
pCur
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
)
==
0
)
{
STqHandle
handle
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
pVal
,
vLen
);
tDecodeSTqHandle
(
&
decoder
,
&
handle
);
handle
.
pWalReader
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
handle
.
execHandle
.
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
}
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
reader
=
{
.
reader
=
handle
.
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
handle
.
execHandle
.
exec
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
handle
.
execHandle
.
exec
.
execCol
.
qmsg
,
&
reader
);
ASSERT
(
handle
.
execHandle
.
exec
.
execCol
.
task
[
i
]);
}
}
else
{
handle
.
execHandle
.
exec
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
taosHashPut
(
pTq
->
handles
,
pKey
,
kLen
,
&
handle
,
sizeof
(
STqHandle
));
}
if
(
tdbTxnClose
(
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
return
0
;
}
int32_t
tqMetaClose
(
STQ
*
pTq
)
{
tdbClose
(
pTq
->
pMetaStore
);
return
0
;
}
int32_t
tqMetaSaveHandle
(
STQ
*
pTq
,
const
char
*
key
,
const
STqHandle
*
pHandle
)
{
int32_t
code
;
int32_t
vlen
;
tEncodeSize
(
tEncodeSTqHandle
,
pHandle
,
vlen
,
code
);
ASSERT
(
code
==
0
);
void
*
buf
=
taosMemoryCalloc
(
1
,
vlen
);
if
(
buf
==
NULL
)
{
ASSERT
(
0
);
}
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
buf
,
vlen
);
if
(
tEncodeSTqHandle
(
&
encoder
,
pHandle
)
<
0
)
{
ASSERT
(
0
);
}
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbBegin
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbUpsert
(
pTq
->
pExecStore
,
key
,
(
int
)
strlen
(
key
),
buf
,
vlen
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
tEncoderClear
(
&
encoder
);
taosMemoryFree
(
buf
);
return
0
;
}
int32_t
tqMetaDeleteHandle
(
STQ
*
pTq
,
const
char
*
key
)
{
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbBegin
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbDelete
(
pTq
->
pExecStore
,
key
,
(
int
)
strlen
(
key
),
&
txn
)
<
0
)
{
/*ASSERT(0);*/
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
return
0
;
}
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
648e0ac1
...
...
@@ -12,3 +12,5 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tq.h"
source/dnode/vnode/test/tsdbSmaTest.cpp
浏览文件 @
648e0ac1
...
...
@@ -368,7 +368,7 @@ TEST(testCase, tSma_Data_Insert_Query_Test) {
SDiskCfg
pDisks
=
{
0
};
pDisks
.
level
=
0
;
pDisks
.
primary
=
1
;
strncpy
(
pDisks
.
dir
,
"/var/lib/taos"
,
TSDB_FILENAME_LEN
);
strncpy
(
pDisks
.
dir
,
TD_DATA_DIR_PATH
,
TSDB_FILENAME_LEN
);
int32_t
numOfDisks
=
1
;
pTsdb
->
pTfs
=
tfsOpen
(
&
pDisks
,
numOfDisks
);
EXPECT_NE
(
pTsdb
->
pTfs
,
nullptr
);
...
...
source/libs/catalog/test/catalogTests.cpp
浏览文件 @
648e0ac1
...
...
@@ -137,7 +137,7 @@ void ctgTestInitLogFile() {
tsAsyncLog
=
0
;
qDebugFlag
=
159
;
strcpy
(
tsLogDir
,
"/var/log/taos"
);
strcpy
(
tsLogDir
,
TD_LOG_DIR_PATH
);
ctgdEnableDebug
(
"api"
);
ctgdEnableDebug
(
"meta"
);
...
...
source/libs/function/src/builtinsimpl.c
浏览文件 @
648e0ac1
...
...
@@ -1646,8 +1646,8 @@ bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInf
pInfo
->
startVal
=
IS_FLOAT_TYPE
(
pCtx
->
param
[
1
].
param
.
nType
)
?
pCtx
->
param
[
1
].
param
.
d
:
(
double
)
pCtx
->
param
[
1
].
param
.
i
;
pInfo
->
stepVal
=
IS_FLOAT_TYPE
(
pCtx
->
param
[
1
].
param
.
nType
)
?
pCtx
->
param
[
2
].
param
.
d
:
(
double
)
pCtx
->
param
[
1
].
param
.
i
;
pInfo
->
stepVal
=
IS_FLOAT_TYPE
(
pCtx
->
param
[
2
].
param
.
nType
)
?
pCtx
->
param
[
2
].
param
.
d
:
(
double
)
pCtx
->
param
[
2
].
param
.
i
;
return
true
;
}
...
...
source/libs/parser/src/parInsert.c
浏览文件 @
648e0ac1
...
...
@@ -1525,14 +1525,14 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
SKvParam
param
=
{.
builder
=
&
tagBuilder
};
for
(
int
c
=
0
;
c
<
tags
->
numOfBound
;
++
c
)
{
SSchema
*
pTagSchema
=
&
pSchema
[
tags
->
boundColumns
[
c
]];
param
.
schema
=
pTagSchema
;
if
(
bind
[
c
].
is_null
&&
bind
[
c
].
is_null
[
0
])
{
KvRowAppend
(
&
pBuf
,
NULL
,
0
,
&
param
);
continue
;
}
SSchema
*
pTagSchema
=
&
pSchema
[
tags
->
boundColumns
[
c
]];
param
.
schema
=
pTagSchema
;
int32_t
colLen
=
pTagSchema
->
bytes
;
if
(
IS_VAR_DATA_TYPE
(
pTagSchema
->
type
))
{
colLen
=
bind
[
c
].
length
[
0
];
...
...
@@ -1724,18 +1724,23 @@ int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBu
return
TSDB_CODE_SUCCESS
;
}
int32_t
buildBoundFields
(
SParsedDataColInfo
*
boundInfo
,
SSchema
*
pSchema
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
int32_t
buildBoundFields
(
SParsedDataColInfo
*
boundInfo
,
SSchema
*
pSchema
,
int32_t
*
fieldNum
,
TAOS_FIELD
_E
**
fields
,
uint8_t
timePrec
)
{
if
(
fields
)
{
*
fields
=
taosMemoryCalloc
(
boundInfo
->
numOfBound
,
sizeof
(
TAOS_FIELD
));
if
(
NULL
==
*
fields
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
SSchema
*
schema
=
&
pSchema
[
boundInfo
->
boundColumns
[
0
]];
if
(
TSDB_DATA_TYPE_TIMESTAMP
==
schema
->
type
)
{
(
*
fields
)[
0
].
precision
=
timePrec
;
}
for
(
int32_t
i
=
0
;
i
<
boundInfo
->
numOfBound
;
++
i
)
{
SSchema
*
pTagS
chema
=
&
pSchema
[
boundInfo
->
boundColumns
[
i
]];
strcpy
((
*
fields
)[
i
].
name
,
pTagS
chema
->
name
);
(
*
fields
)[
i
].
type
=
pTagS
chema
->
type
;
(
*
fields
)[
i
].
bytes
=
pTagS
chema
->
bytes
;
s
chema
=
&
pSchema
[
boundInfo
->
boundColumns
[
i
]];
strcpy
((
*
fields
)[
i
].
name
,
s
chema
->
name
);
(
*
fields
)[
i
].
type
=
s
chema
->
type
;
(
*
fields
)[
i
].
bytes
=
s
chema
->
bytes
;
}
}
...
...
@@ -1744,7 +1749,7 @@ int32_t buildBoundFields(SParsedDataColInfo* boundInfo, SSchema* pSchema, int32_
return
TSDB_CODE_SUCCESS
;
}
int32_t
qBuildStmtTagFields
(
void
*
pBlock
,
void
*
boundTags
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
int32_t
qBuildStmtTagFields
(
void
*
pBlock
,
void
*
boundTags
,
int32_t
*
fieldNum
,
TAOS_FIELD
_E
**
fields
)
{
STableDataBlocks
*
pDataBlock
=
(
STableDataBlocks
*
)
pBlock
;
SParsedDataColInfo
*
tags
=
(
SParsedDataColInfo
*
)
boundTags
;
if
(
NULL
==
tags
)
{
...
...
@@ -1759,12 +1764,12 @@ int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TA
return
TSDB_CODE_SUCCESS
;
}
CHECK_CODE
(
buildBoundFields
(
tags
,
pSchema
,
fieldNum
,
fields
));
CHECK_CODE
(
buildBoundFields
(
tags
,
pSchema
,
fieldNum
,
fields
,
0
));
return
TSDB_CODE_SUCCESS
;
}
int32_t
qBuildStmtColFields
(
void
*
pBlock
,
int32_t
*
fieldNum
,
TAOS_FIELD
**
fields
)
{
int32_t
qBuildStmtColFields
(
void
*
pBlock
,
int32_t
*
fieldNum
,
TAOS_FIELD
_E
**
fields
)
{
STableDataBlocks
*
pDataBlock
=
(
STableDataBlocks
*
)
pBlock
;
SSchema
*
pSchema
=
getTableColumnSchema
(
pDataBlock
->
pTableMeta
);
if
(
pDataBlock
->
boundColumnInfo
.
numOfBound
<=
0
)
{
...
...
@@ -1776,7 +1781,7 @@ int32_t qBuildStmtColFields(void* pBlock, int32_t* fieldNum, TAOS_FIELD** fields
return
TSDB_CODE_SUCCESS
;
}
CHECK_CODE
(
buildBoundFields
(
&
pDataBlock
->
boundColumnInfo
,
pSchema
,
fieldNum
,
fields
));
CHECK_CODE
(
buildBoundFields
(
&
pDataBlock
->
boundColumnInfo
,
pSchema
,
fieldNum
,
fields
,
pDataBlock
->
pTableMeta
->
tableInfo
.
precision
));
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/parser/src/parTranslater.c
浏览文件 @
648e0ac1
...
...
@@ -830,7 +830,8 @@ static EDealRes translateComparisonOperator(STranslateContext* pCxt, SOperatorNo
if
(
!
IS_VAR_DATA_TYPE
(((
SExprNode
*
)(
pOp
->
pLeft
))
->
resType
.
type
))
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pLeft
))
->
aliasName
);
}
if
(
QUERY_NODE_VALUE
!=
nodeType
(
pOp
->
pRight
)
||
!
IS_STR_DATA_TYPE
(((
SExprNode
*
)(
pOp
->
pRight
))
->
resType
.
type
))
{
if
(
QUERY_NODE_VALUE
!=
nodeType
(
pOp
->
pRight
)
||
((
!
IS_STR_DATA_TYPE
(((
SExprNode
*
)(
pOp
->
pRight
))
->
resType
.
type
))
&&
(((
SExprNode
*
)(
pOp
->
pRight
))
->
resType
.
type
!=
TSDB_DATA_TYPE_NULL
)))
{
return
generateDealNodeErrMsg
(
pCxt
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
((
SExprNode
*
)(
pOp
->
pRight
))
->
aliasName
);
}
}
...
...
source/libs/qworker/test/qworkerTests.cpp
浏览文件 @
648e0ac1
...
...
@@ -108,7 +108,7 @@ void qwtInitLogFile() {
tsAsyncLog
=
0
;
qDebugFlag
=
159
;
strcpy
(
tsLogDir
,
"/var/log/taos"
);
strcpy
(
tsLogDir
,
TD_LOG_DIR_PATH
);
if
(
taosInitLog
(
defaultLogFileNamePrefix
,
maxLogFileNum
)
<
0
)
{
printf
(
"failed to open log file in directory:%s
\n
"
,
tsLogDir
);
...
...
source/libs/scalar/test/filter/filterTests.cpp
浏览文件 @
648e0ac1
...
...
@@ -60,7 +60,7 @@ void flttInitLogFile() {
tsAsyncLog
=
0
;
qDebugFlag
=
159
;
strcpy
(
tsLogDir
,
"/var/log/taos"
);
strcpy
(
tsLogDir
,
TD_LOG_DIR_PATH
);
if
(
taosInitLog
(
defaultLogFileNamePrefix
,
maxLogFileNum
)
<
0
)
{
printf
(
"failed to open log file in directory:%s
\n
"
,
tsLogDir
);
...
...
source/libs/scalar/test/scalar/CMakeLists.txt
浏览文件 @
648e0ac1
...
...
@@ -17,7 +17,9 @@ TARGET_INCLUDE_DIRECTORIES(
PUBLIC
"
${
TD_SOURCE_DIR
}
/source/libs/parser/inc"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/scalar/inc"
)
add_test
(
NAME scalarTest
COMMAND scalarTest
)
if
(
NOT TD_WINDOWS
)
add_test
(
NAME scalarTest
COMMAND scalarTest
)
endif
(
NOT TD_WINDOWS
)
source/libs/scalar/test/scalar/scalarTests.cpp
浏览文件 @
648e0ac1
...
...
@@ -74,7 +74,7 @@ void scltInitLogFile() {
tsAsyncLog
=
0
;
qDebugFlag
=
159
;
strcpy
(
tsLogDir
,
"/var/log/taos"
);
strcpy
(
tsLogDir
,
TD_LOG_DIR_PATH
);
if
(
taosInitLog
(
defaultLogFileNamePrefix
,
maxLogFileNum
)
<
0
)
{
printf
(
"failed to open log file in directory:%s
\n
"
,
tsLogDir
);
...
...
source/libs/scheduler/test/schedulerTests.cpp
浏览文件 @
648e0ac1
...
...
@@ -79,7 +79,7 @@ void schtInitLogFile() {
tsAsyncLog
=
0
;
qDebugFlag
=
159
;
strcpy
(
tsLogDir
,
"/var/log/taos"
);
strcpy
(
tsLogDir
,
TD_LOG_DIR_PATH
);
if
(
taosInitLog
(
defaultLogFileNamePrefix
,
maxLogFileNum
)
<
0
)
{
printf
(
"failed to open log file in directory:%s
\n
"
,
tsLogDir
);
...
...
source/os/CMakeLists.txt
浏览文件 @
648e0ac1
...
...
@@ -10,7 +10,11 @@ target_include_directories(
PUBLIC
"
${
TD_SOURCE_DIR
}
/contrib/msvcregex"
)
# iconv
find_path
(
IconvApiIncludes iconv.h PATHS
)
if
(
TD_WINDOWS
)
find_path
(
IconvApiIncludes iconv.h
"
${
TD_SOURCE_DIR
}
/contrib/iconv"
)
else
()
find_path
(
IconvApiIncludes iconv.h PATHS
)
endif
(
TD_WINDOWS
)
if
(
NOT IconvApiIncludes
)
add_definitions
(
-DDISALLOW_NCHAR_WITHOUT_ICONV
)
endif
()
...
...
source/os/src/osEnv.c
浏览文件 @
648e0ac1
...
...
@@ -70,11 +70,11 @@ void osDefaultInit() {
#elif defined(_TD_DARWIN_64)
if
(
configDir
[
0
]
==
0
)
{
strcpy
(
configDir
,
"/
tmp/taosd
"
);
strcpy
(
configDir
,
"/
usr/local/etc/taos
"
);
}
strcpy
(
tsDataDir
,
"/usr/local/var/lib/taos"
);
strcpy
(
tsLogDir
,
"/usr/local/var/log/taos"
);
strcpy
(
tsTempDir
,
"/
usr/local/etc/taos
"
);
strcpy
(
tsTempDir
,
"/
tmp/taosd
"
);
strcpy
(
tsOsName
,
"Darwin"
);
#else
...
...
source/util/src/terror.c
浏览文件 @
648e0ac1
...
...
@@ -187,9 +187,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists"
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_SNODE_NOT_EXIST
,
"Snode not there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_BNODE_ALREADY_EXIST
,
"Bnode already exists"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_BNODE_NOT_EXIST
,
"Bnode not there"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_TOO_FEW_MNODES
,
"T
oo few mnodes
"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_
MNODE_DEPLOYED
,
"Mnode deployed in this dnode
"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_CANT_DROP_MASTER
,
"Can't drop mnode which is
master
"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_TOO_FEW_MNODES
,
"T
he replicas of mnode cannot less than 1
"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_
TOO_MANY_MNODES
,
"The replicas of mnode cannot exceed 3
"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_CANT_DROP_MASTER
,
"Can't drop mnode which is
LEADER
"
)
// mnode-acct
TAOS_DEFINE_ERROR
(
TSDB_CODE_MND_ACCT_ALREADY_EXIST
,
"Account already exists"
)
...
...
tests/script/api/batchprepare.c
浏览文件 @
648e0ac1
此差异已折叠。
点击以展开。
tests/script/jenkins/basic.txt
浏览文件 @
648e0ac1
...
...
@@ -57,7 +57,6 @@
# ---- mnode
./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim
./test.sh -f tsim/mnode/basic3.sim
# ---- show
./test.sh -f tsim/show/basic.sim
...
...
tests/script/tsim/mnode/basic3.sim
浏览文件 @
648e0ac1
...
...
@@ -2,14 +2,17 @@ system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode3 -s start
system sh/exec.sh -n dnode4 -s start
sql connect
print =============== step1: create dnodes
sql create dnode $hostname port 7200
sql create dnode $hostname port 7300
sql create dnode $hostname port 7400
$x = 0
step1:
...
...
@@ -32,6 +35,7 @@ endi
print =============== step2: create mnode 2
sql create mnode on dnode 2
sql create mnode on dnode 3
sql_error create mnode on dnode 4
$x = 0
step2:
...
...
@@ -106,6 +110,10 @@ print $data(1)[0] $data(1)[1] $data(1)[2]
print $data(2)[0] $data(2)[1] $data(2)[2]
print $data(3)[0] $data(3)[1] $data(3)[2]
if $data(2)[2] != OFFLINE then
goto step5
endi
sql show users
if $rows != 2 then
return -1
...
...
@@ -134,4 +142,5 @@ endi
system sh/exec.sh -n dnode1 -s stop
system sh/exec.sh -n dnode2 -s stop
system sh/exec.sh -n dnode3 -s stop
\ No newline at end of file
system sh/exec.sh -n dnode3 -s stop
system sh/exec.sh -n dnode4 -s stop
\ No newline at end of file
tests/script/tsim/trans/create_db.sim
浏览文件 @
648e0ac1
...
...
@@ -76,14 +76,6 @@ if $data[0][3] != d1 then
return -1
endi
if $data[0][4] != create-db then
return -1
endi
if $data[0][7] != @Unable to establish connection@ then
return -1
endi
sql_error create database d1 vgroups 2;
print =============== start dnode2
...
...
@@ -125,15 +117,7 @@ endi
if $data[0][3] != d2 then
return -1
endi
if $data[0][4] != create-db then
return -1
endi
if $data[0][7] != @Unable to establish connection@ then
return -1
endi
return
sql_error create database d2 vgroups 2;
print =============== kill transaction
...
...
tests/test/c/sdbDump.c
浏览文件 @
648e0ac1
...
...
@@ -279,9 +279,9 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject
(
item
,
"id"
,
pObj
->
id
);
tjsonAddIntegerToObject
(
item
,
"stage"
,
pObj
->
stage
);
tjsonAddIntegerToObject
(
item
,
"policy"
,
pObj
->
policy
);
tjsonAddIntegerToObject
(
item
,
"type"
,
pObj
->
type
);
tjsonAddIntegerToObject
(
item
,
"conflict"
,
pObj
->
conflict
);
tjsonAddIntegerToObject
(
item
,
"exec"
,
pObj
->
exec
);
tjsonAddStringToObject
(
item
,
"createdTime"
,
i642str
(
pObj
->
createdTime
));
tjsonAddStringToObject
(
item
,
"dbUid"
,
i642str
(
pObj
->
dbUid
));
tjsonAddStringToObject
(
item
,
"dbname"
,
pObj
->
dbname
);
tjsonAddIntegerToObject
(
item
,
"commitLogNum"
,
taosArrayGetSize
(
pObj
->
commitActions
));
tjsonAddIntegerToObject
(
item
,
"redoActionNum"
,
taosArrayGetSize
(
pObj
->
redoActions
));
...
...
taos-tools
@
717f5aaa
比较
4d83d8c6
...
717f5aaa
Subproject commit
4d83d8c62973506f760bcaa3a33f4665ed9046d0
Subproject commit
717f5aaa5f0a1b4d92bb2ae68858fec554fb5eda
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录