Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
58352b76
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
58352b76
编写于
3月 29, 2022
作者:
L
Liu Jicong
提交者:
GitHub
3月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11091 from taosdata/feature/tq
stream set multi input
上级
f37421e7
e832deed
变更
15
展开全部
隐藏空白更改
内联
并排
Showing
15 changed file
with
2587 addition
and
2643 deletion
+2587
-2643
example/src/tmq.c
example/src/tmq.c
+3
-2
include/client/taos.h
include/client/taos.h
+25
-10
include/common/tmsg.h
include/common/tmsg.h
+11
-45
include/libs/executor/executor.h
include/libs/executor/executor.h
+3
-4
source/client/src/clientMain.c
source/client/src/clientMain.c
+122
-101
source/client/src/tmq.c
source/client/src/tmq.c
+30
-10
source/client/test/tmqTest.cpp
source/client/test/tmqTest.cpp
+8
-6
source/dnode/mnode/impl/src/mndInfoSchema.c
source/dnode/mnode/impl/src/mndInfoSchema.c
+177
-139
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+1
-295
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+6
-5
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+8
-4
source/libs/executor/src/executor.c
source/libs/executor/src/executor.c
+7
-6
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2158
-1989
source/libs/stream/src/tstream.c
source/libs/stream/src/tstream.c
+15
-14
tests/script/tsim/tmq/basic1.sim
tests/script/tsim/tmq/basic1.sim
+13
-13
未找到文件。
example/src/tmq.c
浏览文件 @
58352b76
...
...
@@ -80,8 +80,9 @@ int32_t create_topic() {
}
taos_free_result
(
pRes
);
const
char
*
sql
=
"select * from tu1"
;
pRes
=
tmq_create_topic
(
pConn
,
"test_stb_topic_1"
,
sql
,
strlen
(
sql
));
/*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
pRes
=
taos_query
(
pConn
,
"create topic test_stb_topic_1 as select * from tu1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic test_stb_topic_1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
include/client/taos.h
浏览文件 @
58352b76
...
...
@@ -26,9 +26,11 @@ extern "C" {
typedef
void
TAOS
;
typedef
void
TAOS_STMT
;
typedef
void
TAOS_RES
;
typedef
void
**
TAOS_ROW
;
#if 0
typedef void TAOS_STREAM;
typedef void TAOS_SUB;
typedef
void
**
TAOS_ROW
;
#endif
// Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
...
...
@@ -149,6 +151,7 @@ DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT
TAOS_RES
*
taos_stmt_use_result
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_close
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
char
*
taos_stmt_errstr
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
);
DLL_EXPORT
TAOS_RES
*
taos_query
(
TAOS
*
taos
,
const
char
*
sql
);
DLL_EXPORT
TAOS_RES
*
taos_query_l
(
TAOS
*
taos
,
const
char
*
sql
,
int
sqlLen
);
...
...
@@ -179,6 +182,7 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT
void
taos_query_a
(
TAOS
*
taos
,
const
char
*
sql
,
__taos_async_fn_t
fp
,
void
*
param
);
DLL_EXPORT
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
);
#if 0
typedef void (*__taos_sub_fn_t)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, __taos_sub_fn_t fp,
void *param, int interval);
...
...
@@ -188,6 +192,7 @@ DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
#endif
DLL_EXPORT
int
taos_load_table_info
(
TAOS
*
taos
,
const
char
*
tableNameList
);
DLL_EXPORT
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
);
...
...
@@ -215,8 +220,12 @@ DLL_EXPORT tmq_list_t *tmq_list_new();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
const
char
*
);
DLL_EXPORT
void
tmq_list_destroy
(
tmq_list_t
*
);
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
tmq_t
*
tmq_consumer_new1
(
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
// will be removed in 3.0
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
// will replace last one
DLL_EXPORT
tmq_t
*
tmq_consumer_new1
(
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
const
char
*
tmq_err2str
(
tmq_resp_err_t
);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
...
...
@@ -255,20 +264,26 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT
TAOS_ROW
tmq_get_row
(
tmq_message_t
*
message
);
DLL_EXPORT
char
*
tmq_get_topic_name
(
tmq_message_t
*
message
);
DLL_EXPORT
void
*
tmq_get_topic_schema
(
tmq_t
*
tmq
,
const
char
*
topic
);
DLL_EXPORT
void
tmq_message_destroy
(
tmq_message_t
*
tmq_message
);
DLL_EXPORT
TAOS_ROW
tmq_get_row
(
tmq_message_t
*
message
);
DLL_EXPORT
char
*
tmq_get_topic_name
(
tmq_message_t
*
message
);
DLL_EXPORT
int32_t
tmq_get_vgroup_id
(
tmq_message_t
*
message
);
DLL_EXPORT
int64_t
tmq_get_request_offset
(
tmq_message_t
*
message
);
DLL_EXPORT
int64_t
tmq_get_response_offset
(
tmq_message_t
*
message
);
DLL_EXPORT
TAOS_FIELD
*
tmq_get_fields
(
tmq_t
*
tmq
,
const
char
*
topic
);
DLL_EXPORT
int32_t
tmq_field_count
(
tmq_t
*
tmq
,
const
char
*
topic
);
DLL_EXPORT
void
tmq_message_destroy
(
tmq_message_t
*
tmq_message
);
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
#endif
DLL_EXPORT
TAOS_RES
*
tmq_create_stream
(
TAOS
*
taos
,
const
char
*
streamName
,
const
char
*
tbName
,
const
char
*
sql
);
/* -------------------------------- OTHER -------------------------------- */
/* ------------------------------ TMQ END -------------------------------- */
#if 0
typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
DLL_EXPORT
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
);
#endif
#ifdef __cplusplus
}
...
...
include/common/tmsg.h
浏览文件 @
58352b76
...
...
@@ -2215,23 +2215,6 @@ static FORCE_INLINE void* tDecodeTSmaWrapper(void* buf, STSmaWrapper* pSW) {
return
buf
;
}
typedef
struct
{
int64_t
uid
;
int32_t
numOfRows
;
char
*
colData
;
}
SMqTbData
;
typedef
struct
{
char
topicName
[
TSDB_TOPIC_FNAME_LEN
];
int64_t
committedOffset
;
int64_t
reqOffset
;
int64_t
rspOffset
;
int32_t
skipLogNum
;
int32_t
bodyLen
;
int32_t
numOfTb
;
SMqTbData
*
tbData
;
}
SMqTopicData
;
typedef
struct
{
int8_t
mqMsgType
;
int32_t
code
;
...
...
@@ -2259,8 +2242,11 @@ typedef struct {
}
SMqSubVgEp
;
typedef
struct
{
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
SArray
*
vgs
;
// SArray<SMqSubVgEp>
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
int8_t
isSchemaAdaptive
;
SArray
*
vgs
;
// SArray<SMqSubVgEp>
int32_t
numOfFields
;
TAOS_FIELD
*
fields
;
}
SMqSubTopicEp
;
typedef
struct
{
...
...
@@ -2281,32 +2267,6 @@ typedef struct {
SArray
*
topics
;
// SArray<SMqSubTopicEp>
}
SMqCMGetSubEpRsp
;
typedef
struct
{
int32_t
curBlock
;
int32_t
curRow
;
void
**
uData
;
}
SMqRowIter
;
struct
tmq_message_t
{
SMqPollRsp
msg
;
void
*
vg
;
SMqRowIter
iter
;
};
#if 0
struct tmq_message_t {
SMqRspHead head;
union {
SMqPollRsp consumeRsp;
SMqCMGetSubEpRsp getEpRsp;
};
void* extra;
int32_t curBlock;
int32_t curRow;
void** uData;
};
#endif
static
FORCE_INLINE
void
tDeleteSMqSubTopicEp
(
SMqSubTopicEp
*
pSubTopicEp
)
{
taosArrayDestroy
(
pSubTopicEp
->
vgs
);
}
static
FORCE_INLINE
int32_t
tEncodeSMqSubVgEp
(
void
**
buf
,
const
SMqSubVgEp
*
pVgEp
)
{
...
...
@@ -2331,17 +2291,21 @@ static FORCE_INLINE void tDeleteSMqCMGetSubEpRsp(SMqCMGetSubEpRsp* pRsp) {
static
FORCE_INLINE
int32_t
tEncodeSMqSubTopicEp
(
void
**
buf
,
const
SMqSubTopicEp
*
pTopicEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pTopicEp
->
topic
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pTopicEp
->
isSchemaAdaptive
);
int32_t
sz
=
taosArrayGetSize
(
pTopicEp
->
vgs
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqSubVgEp
*
pVgEp
=
(
SMqSubVgEp
*
)
taosArrayGet
(
pTopicEp
->
vgs
,
i
);
tlen
+=
tEncodeSMqSubVgEp
(
buf
,
pVgEp
);
}
tlen
+=
taosEncodeFixedI32
(
buf
,
pTopicEp
->
numOfFields
);
// tlen += taosEncodeBinary(buf, pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD));
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqSubTopicEp
(
void
*
buf
,
SMqSubTopicEp
*
pTopicEp
)
{
buf
=
taosDecodeStringTo
(
buf
,
pTopicEp
->
topic
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pTopicEp
->
isSchemaAdaptive
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pTopicEp
->
vgs
=
taosArrayInit
(
sz
,
sizeof
(
SMqSubVgEp
));
...
...
@@ -2353,6 +2317,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
buf
=
tDecodeSMqSubVgEp
(
buf
,
&
vgEp
);
taosArrayPush
(
pTopicEp
->
vgs
,
&
vgEp
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
pTopicEp
->
numOfFields
);
// buf = taosDecodeBinary(buf, (void**)&pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD));
return
buf
;
}
...
...
include/libs/executor/executor.h
浏览文件 @
58352b76
...
...
@@ -20,8 +20,8 @@
extern
"C"
{
#endif
#include "tcommon.h"
#include "query.h"
#include "tcommon.h"
typedef
void
*
qTaskInfo_t
;
typedef
void
*
DataSinkHandle
;
...
...
@@ -36,7 +36,7 @@ typedef struct SReadHandle {
#define STREAM_DATA_TYPE_SUBMIT_BLOCK 0x1
#define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2
/**
* Create the exec task for streaming mode
* @param pMsg
...
...
@@ -62,7 +62,7 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type);
* @param type
* @return
*/
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
void
*
*
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
);
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
);
/**
* Update the table id list, add or remove.
...
...
@@ -167,7 +167,6 @@ int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type);
*/
void
**
qReleaseTask
(
void
*
pMgmt
,
void
*
pQInfo
,
bool
freeHandle
);
void
qProcessFetchRsp
(
void
*
parent
,
struct
SRpcMsg
*
pMsg
,
struct
SEpSet
*
pEpSet
);
#ifdef __cplusplus
...
...
source/client/src/clientMain.c
浏览文件 @
58352b76
#include "os.h"
#include "tref.h"
#include "trpc.h"
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "catalog.h"
#include "clientInt.h"
#include "clientLog.h"
#include "os.h"
#include "query.h"
#include "tmsg.h"
#include "tglobal.h"
#include "catalog.h"
#include "tmsg.h"
#include "tref.h"
#include "trpc.h"
#include "version.h"
#define TSC_VAR_NOT_RELEASE 1
...
...
@@ -24,7 +39,7 @@ int taos_options(TSDB_OPTION option, const void *arg, ...) {
}
}
int
ret
=
taos_options_imp
(
option
,
(
const
char
*
)
arg
);
int
ret
=
taos_options_imp
(
option
,
(
const
char
*
)
arg
);
atomic_store_32
(
&
lock
,
0
);
return
ret
;
}
...
...
@@ -69,13 +84,13 @@ TAOS *taos_connect(const char *ip, const char *user, const char *pass, const cha
return
taos_connect_internal
(
ip
,
user
,
pass
,
NULL
,
db
,
port
);
}
void
taos_close
(
TAOS
*
taos
)
{
void
taos_close
(
TAOS
*
taos
)
{
if
(
taos
==
NULL
)
{
return
;
}
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
tscDebug
(
"0x%"
PRIx64
" try to close connection, numOfReq:%d"
,
pTscObj
->
id
,
pTscObj
->
numOfReqs
);
tscDebug
(
"0x%"
PRIx64
" try to close connection, numOfReq:%d"
,
pTscObj
->
id
,
pTscObj
->
numOfReqs
);
/*taosRemoveRef(clientConnRefPool, pTscObj->id);*/
}
...
...
@@ -85,48 +100,46 @@ int taos_errno(TAOS_RES *tres) {
return
terrno
;
}
return
((
SRequestObj
*
)
tres
)
->
code
;
return
((
SRequestObj
*
)
tres
)
->
code
;
}
const
char
*
taos_errstr
(
TAOS_RES
*
res
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
==
NULL
)
{
return
(
const
char
*
)
tstrerror
(
terrno
);
return
(
const
char
*
)
tstrerror
(
terrno
);
}
if
(
strlen
(
pRequest
->
msgBuf
)
>
0
||
pRequest
->
code
==
TSDB_CODE_RPC_FQDN_ERROR
)
{
return
pRequest
->
msgBuf
;
}
else
{
return
(
const
char
*
)
tstrerror
(
pRequest
->
code
);
return
(
const
char
*
)
tstrerror
(
pRequest
->
code
);
}
}
void
taos_free_result
(
TAOS_RES
*
res
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
destroyRequest
(
pRequest
);
}
int
taos_field_count
(
TAOS_RES
*
res
)
{
int
taos_field_count
(
TAOS_RES
*
res
)
{
if
(
res
==
NULL
)
{
return
0
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
return
pResInfo
->
numOfCols
;
}
int
taos_num_fields
(
TAOS_RES
*
res
)
{
return
taos_field_count
(
res
);
}
int
taos_num_fields
(
TAOS_RES
*
res
)
{
return
taos_field_count
(
res
);
}
TAOS_FIELD
*
taos_fetch_fields
(
TAOS_RES
*
res
)
{
if
(
taos_num_fields
(
res
)
==
0
)
{
return
NULL
;
}
SReqResultInfo
*
pResInfo
=
&
(((
SRequestObj
*
)
res
)
->
body
.
resInfo
);
SReqResultInfo
*
pResInfo
=
&
(((
SRequestObj
*
)
res
)
->
body
.
resInfo
);
return
pResInfo
->
fields
;
}
...
...
@@ -135,7 +148,7 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
return
NULL
;
}
return
taos_query_l
(
taos
,
sql
,
(
int32_t
)
strlen
(
sql
));
return
taos_query_l
(
taos
,
sql
,
(
int32_t
)
strlen
(
sql
));
}
TAOS_ROW
taos_fetch_row
(
TAOS_RES
*
res
)
{
...
...
@@ -143,18 +156,16 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return
NULL
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
)
{
return
NULL
;
}
return
doFetchRow
(
pRequest
,
true
);
}
int
taos_print_row
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
)
{
int
taos_print_row
(
char
*
str
,
TAOS_ROW
row
,
TAOS_FIELD
*
fields
,
int
num_fields
)
{
int32_t
len
=
0
;
for
(
int
i
=
0
;
i
<
num_fields
;
++
i
)
{
if
(
i
>
0
)
{
...
...
@@ -213,7 +224,7 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
case
TSDB_DATA_TYPE_BINARY
:
case
TSDB_DATA_TYPE_NCHAR
:
{
int32_t
charLen
=
varDataLen
((
char
*
)
row
[
i
]
-
VARSTR_HEADER_SIZE
);
int32_t
charLen
=
varDataLen
((
char
*
)
row
[
i
]
-
VARSTR_HEADER_SIZE
);
if
(
fields
[
i
].
type
==
TSDB_DATA_TYPE_BINARY
)
{
assert
(
charLen
<=
fields
[
i
].
bytes
&&
charLen
>=
0
);
}
else
{
...
...
@@ -238,31 +249,44 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
return
len
;
}
int
*
taos_fetch_lengths
(
TAOS_RES
*
res
)
{
int
*
taos_fetch_lengths
(
TAOS_RES
*
res
)
{
if
(
res
==
NULL
)
{
return
NULL
;
}
return
((
SRequestObj
*
)
res
)
->
body
.
resInfo
.
length
;
return
((
SRequestObj
*
)
res
)
->
body
.
resInfo
.
length
;
}
// todo intergrate with tDataTypes
const
char
*
taos_data_type
(
int
type
)
{
switch
(
type
)
{
case
TSDB_DATA_TYPE_NULL
:
return
"TSDB_DATA_TYPE_NULL"
;
case
TSDB_DATA_TYPE_BOOL
:
return
"TSDB_DATA_TYPE_BOOL"
;
case
TSDB_DATA_TYPE_TINYINT
:
return
"TSDB_DATA_TYPE_TINYINT"
;
case
TSDB_DATA_TYPE_SMALLINT
:
return
"TSDB_DATA_TYPE_SMALLINT"
;
case
TSDB_DATA_TYPE_INT
:
return
"TSDB_DATA_TYPE_INT"
;
case
TSDB_DATA_TYPE_BIGINT
:
return
"TSDB_DATA_TYPE_BIGINT"
;
case
TSDB_DATA_TYPE_FLOAT
:
return
"TSDB_DATA_TYPE_FLOAT"
;
case
TSDB_DATA_TYPE_DOUBLE
:
return
"TSDB_DATA_TYPE_DOUBLE"
;
case
TSDB_DATA_TYPE_VARCHAR
:
return
"TSDB_DATA_TYPE_VARCHAR"
;
// case TSDB_DATA_TYPE_BINARY: return "TSDB_DATA_TYPE_VARCHAR";
case
TSDB_DATA_TYPE_TIMESTAMP
:
return
"TSDB_DATA_TYPE_TIMESTAMP"
;
case
TSDB_DATA_TYPE_NCHAR
:
return
"TSDB_DATA_TYPE_NCHAR"
;
case
TSDB_DATA_TYPE_JSON
:
return
"TSDB_DATA_TYPE_JSON"
;
default:
return
"UNKNOWN"
;
case
TSDB_DATA_TYPE_NULL
:
return
"TSDB_DATA_TYPE_NULL"
;
case
TSDB_DATA_TYPE_BOOL
:
return
"TSDB_DATA_TYPE_BOOL"
;
case
TSDB_DATA_TYPE_TINYINT
:
return
"TSDB_DATA_TYPE_TINYINT"
;
case
TSDB_DATA_TYPE_SMALLINT
:
return
"TSDB_DATA_TYPE_SMALLINT"
;
case
TSDB_DATA_TYPE_INT
:
return
"TSDB_DATA_TYPE_INT"
;
case
TSDB_DATA_TYPE_BIGINT
:
return
"TSDB_DATA_TYPE_BIGINT"
;
case
TSDB_DATA_TYPE_FLOAT
:
return
"TSDB_DATA_TYPE_FLOAT"
;
case
TSDB_DATA_TYPE_DOUBLE
:
return
"TSDB_DATA_TYPE_DOUBLE"
;
case
TSDB_DATA_TYPE_VARCHAR
:
return
"TSDB_DATA_TYPE_VARCHAR"
;
// case TSDB_DATA_TYPE_BINARY: return "TSDB_DATA_TYPE_VARCHAR";
case
TSDB_DATA_TYPE_TIMESTAMP
:
return
"TSDB_DATA_TYPE_TIMESTAMP"
;
case
TSDB_DATA_TYPE_NCHAR
:
return
"TSDB_DATA_TYPE_NCHAR"
;
case
TSDB_DATA_TYPE_JSON
:
return
"TSDB_DATA_TYPE_JSON"
;
default:
return
"UNKNOWN"
;
}
}
...
...
@@ -273,8 +297,8 @@ int taos_affected_rows(TAOS_RES *res) {
return
0
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
SReqResultInfo
*
pResInfo
=
&
pRequest
->
body
.
resInfo
;
return
pResInfo
->
numOfRows
;
}
...
...
@@ -295,8 +319,8 @@ int taos_select_db(TAOS *taos, const char *db) {
char
sql
[
256
]
=
{
0
};
snprintf
(
sql
,
tListLen
(
sql
),
"use %s"
,
db
);
TAOS_RES
*
pRequest
=
taos_query
(
taos
,
sql
);
int32_t
code
=
taos_errno
(
pRequest
);
TAOS_RES
*
pRequest
=
taos_query
(
taos
,
sql
);
int32_t
code
=
taos_errno
(
pRequest
);
taos_free_result
(
pRequest
);
return
code
;
...
...
@@ -307,61 +331,57 @@ void taos_stop_query(TAOS_RES *res) {
return
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
int32_t
numOfFields
=
taos_num_fields
(
pRequest
);
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
int32_t
numOfFields
=
taos_num_fields
(
pRequest
);
// It is not a query, no need to stop.
if
(
numOfFields
==
0
)
{
return
;
}
// scheduleCancelJob(pRequest->body.pQueryJob);
// scheduleCancelJob(pRequest->body.pQueryJob);
}
bool
taos_is_null
(
TAOS_RES
*
res
,
int32_t
row
,
int32_t
col
)
{
SRequestObj
*
pRequestObj
=
res
;
SReqResultInfo
*
pResultInfo
=
&
pRequestObj
->
body
.
resInfo
;
SRequestObj
*
pRequestObj
=
res
;
SReqResultInfo
*
pResultInfo
=
&
pRequestObj
->
body
.
resInfo
;
if
(
col
>=
pResultInfo
->
numOfCols
||
col
<
0
||
row
>=
pResultInfo
->
numOfRows
||
row
<
0
)
{
return
true
;
}
SResultColumn
*
pCol
=
&
pRequestObj
->
body
.
resInfo
.
pCol
[
col
];
SResultColumn
*
pCol
=
&
pRequestObj
->
body
.
resInfo
.
pCol
[
col
];
return
colDataIsNull_f
(
pCol
->
nullbitmap
,
row
);
}
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
)
{
int
taos_fetch_block
(
TAOS_RES
*
res
,
TAOS_ROW
*
rows
)
{
if
(
res
==
NULL
)
{
return
0
;
}
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
)
{
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
if
(
pRequest
->
type
==
TSDB_SQL_RETRIEVE_EMPTY_RESULT
||
pRequest
->
type
==
TSDB_SQL_INSERT
||
pRequest
->
code
!=
TSDB_CODE_SUCCESS
||
taos_num_fields
(
res
)
==
0
)
{
return
0
;
}
doFetchRow
(
pRequest
,
false
);
// TODO refactor
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
SReqResultInfo
*
pResultInfo
=
&
pRequest
->
body
.
resInfo
;
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
*
rows
=
pResultInfo
->
row
;
return
pResultInfo
->
numOfRows
;
}
int
taos_validate_sql
(
TAOS
*
taos
,
const
char
*
sql
)
{
return
true
;
}
int
taos_validate_sql
(
TAOS
*
taos
,
const
char
*
sql
)
{
return
true
;
}
const
char
*
taos_get_server_info
(
TAOS
*
taos
)
{
if
(
taos
==
NULL
)
{
return
NULL
;
}
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
STscObj
*
pTscObj
=
(
STscObj
*
)
taos
;
return
pTscObj
->
ver
;
}
...
...
@@ -372,7 +392,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
)
{
// TODO
}
#if 0
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) {
// TODO
return NULL;
...
...
@@ -386,63 +406,64 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
// TODO
}
#endif
TAOS_STMT
*
taos_stmt_init
(
TAOS
*
taos
)
{
// TODO
return
NULL
;
TAOS_STMT
*
taos_stmt_init
(
TAOS
*
taos
)
{
// TODO
return
NULL
;
}
int
taos_stmt_close
(
TAOS_STMT
*
stmt
)
{
// TODO
return
-
1
;
int
taos_stmt_close
(
TAOS_STMT
*
stmt
)
{
// TODO
return
-
1
;
}
int
taos_stmt_execute
(
TAOS_STMT
*
stmt
)
{
// TODO
return
-
1
;
int
taos_stmt_execute
(
TAOS_STMT
*
stmt
)
{
// TODO
return
-
1
;
}
char
*
taos_stmt_errstr
(
TAOS_STMT
*
stmt
)
{
// TODO
return
NULL
;
// TODO
return
NULL
;
}
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
)
{
// TODO
return
-
1
;
int
taos_stmt_affected_rows
(
TAOS_STMT
*
stmt
)
{
// TODO
return
-
1
;
}
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
)
{
// TODO
return
NULL
;
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
)
{
// TODO
return
NULL
;
}
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_BIND
*
bind
)
{
// TODO
return
-
1
;
int
taos_stmt_bind_param
(
TAOS_STMT
*
stmt
,
TAOS_BIND
*
bind
)
{
// TODO
return
-
1
;
}
int
taos_stmt_prepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
)
{
// TODO
return
-
1
;
int
taos_stmt_prepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
)
{
// TODO
return
-
1
;
}
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_BIND
*
tags
)
{
// TODO
return
-
1
;
int
taos_stmt_set_tbname_tags
(
TAOS_STMT
*
stmt
,
const
char
*
name
,
TAOS_BIND
*
tags
)
{
// TODO
return
-
1
;
}
int
taos_stmt_set_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
)
{
// TODO
return
-
1
;
int
taos_stmt_set_tbname
(
TAOS_STMT
*
stmt
,
const
char
*
name
)
{
// TODO
return
-
1
;
}
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
)
{
// TODO
return
-
1
;
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
)
{
// TODO
return
-
1
;
}
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
)
{
// TODO
return
-
1
;
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
)
{
// TODO
return
-
1
;
}
source/client/src/tmq.c
浏览文件 @
58352b76
...
...
@@ -26,6 +26,18 @@
#include "tqueue.h"
#include "tref.h"
typedef
struct
{
int32_t
curBlock
;
int32_t
curRow
;
void
**
uData
;
}
SMqRowIter
;
struct
tmq_message_t
{
SMqPollRsp
msg
;
void
*
vg
;
SMqRowIter
iter
;
};
struct
tmq_list_t
{
SArray
container
;
};
...
...
@@ -99,13 +111,14 @@ typedef struct {
typedef
struct
{
// subscribe info
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
int32_t
nextVgIdx
;
SArray
*
vgs
;
// SArray<SMqClientVg>
SSchemaWrapper
schema
;
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
SArray
*
vgs
;
// SArray<SMqClientVg>
int8_t
isSchemaAdaptive
;
int32_t
numOfFields
;
TAOS_FIELD
*
fields
;
}
SMqClientTopic
;
typedef
struct
{
...
...
@@ -130,11 +143,11 @@ typedef struct {
}
SMqPollCbParam
;
typedef
struct
{
tmq_t
*
tmq
;
/*SMqClientVg* pVg;*/
tmq_t
*
tmq
;
int32_t
async
;
tsem_t
rspSem
;
tmq_resp_err_t
rspErr
;
/*SMqClientVg* pVg;*/
}
SMqCommitCbParam
;
tmq_conf_t
*
tmq_conf_new
()
{
...
...
@@ -471,7 +484,12 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
tNameExtractFullName
(
&
name
,
topicFname
);
tscDebug
(
"subscribe topic: %s"
,
topicFname
);
SMqClientTopic
topic
=
{
.
nextVgIdx
=
0
,
.
sql
=
NULL
,
.
sqlLen
=
0
,
.
topicId
=
0
,
.
topicName
=
topicFname
,
.
vgs
=
NULL
};
.
sql
=
NULL
,
.
sqlLen
=
0
,
.
topicId
=
0
,
.
topicName
=
topicFname
,
.
vgs
=
NULL
,
};
topic
.
vgs
=
taosArrayInit
(
0
,
sizeof
(
SMqClientVg
));
taosArrayPush
(
tmq
->
clientTopics
,
&
topic
);
taosArrayPush
(
req
.
topicNames
,
&
topicFname
);
...
...
@@ -615,6 +633,7 @@ _return:
return
pRequest
;
}
#if 0
TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL;
...
...
@@ -700,6 +719,7 @@ _return:
return pRequest;
}
#endif
static
char
*
formatTimestamp
(
char
*
buf
,
int64_t
val
,
int
precision
)
{
time_t
tt
;
...
...
source/client/test/tmqTest.cpp
浏览文件 @
58352b76
...
...
@@ -35,7 +35,7 @@ int main(int argc, char** argv) {
TEST
(
testCase
,
driverInit_Test
)
{
// taosInitGlobalCfg();
// taos_init();
// taos_init();
}
TEST
(
testCase
,
create_topic_ctb_Test
)
{
...
...
@@ -55,8 +55,9 @@ TEST(testCase, create_topic_ctb_Test) {
taos_free_result
(
pRes
);
char
*
sql
=
"select * from tu"
;
pRes
=
tmq_create_topic
(
pConn
,
"test_ctb_topic_1"
,
sql
,
strlen
(
sql
));
// char* sql = "select * from tu";
// pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql));
pRes
=
taos_query
(
pConn
,
"create test_ctb_topic_1 as select * from tu"
);
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
...
...
@@ -69,7 +70,7 @@ TEST(testCase, create_topic_stb_Test) {
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"error in use db, reason:%s
\n
"
,
taos_errstr
(
pRes
));
}
//taos_free_result(pRes);
//
taos_free_result(pRes);
TAOS_FIELD
*
pFields
=
taos_fetch_fields
(
pRes
);
ASSERT_TRUE
(
pFields
==
nullptr
);
...
...
@@ -79,8 +80,9 @@ TEST(testCase, create_topic_stb_Test) {
taos_free_result
(
pRes
);
char
*
sql
=
"select * from st1"
;
pRes
=
tmq_create_topic
(
pConn
,
"test_stb_topic_1"
,
sql
,
strlen
(
sql
));
// char* sql = "select * from st1";
// pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
pRes
=
taos_query
(
pConn
,
"create test_ctb_topic_1 as select * from st1"
);
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
...
...
source/dnode/mnode/impl/src/mndInfoSchema.c
浏览文件 @
58352b76
此差异已折叠。
点击以展开。
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
58352b76
...
...
@@ -62,8 +62,6 @@ static int32_t mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqT
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
);
static
int32_t
mndInitUnassignedVg
(
SMnode
*
pMnode
,
const
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
);
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SUBSCRIBE
,
.
keyType
=
SDB_KEY_BINARY
,
...
...
@@ -98,14 +96,6 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
return
NULL
;
}
#if 0
if (mndInitUnassignedVg(pMnode, pTopic, pSub) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tDeleteSMqSubscribeObj(pSub);
taosMemoryFree(pSub);
return NULL;
}
#endif
// TODO: disable alter subscribed table
return
pSub
;
}
...
...
@@ -210,45 +200,6 @@ static int32_t mndPersistCancelConnReq(SMnode *pMnode, STrans *pTrans, const SMq
return
0
;
}
#if 0
static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg) {
SMnode *pMnode = pMsg->pNode;
uint8_t *str = pMsg->rpcMsg.pCont;
SMqCMResetOffsetReq req;
SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, str, pMsg->rpcMsg.contLen, TD_DECODER);
tDecodeSMqCMResetOffsetReq(&decoder, &req);
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (pHash == NULL) {
return -1;
}
for (int32_t i = 0; i < req.num; i++) {
SMqOffset *pOffset = &req.offsets[i];
SMqVgOffsets *pVgOffset = taosHashGet(pHash, &pOffset->vgId, sizeof(int32_t));
if (pVgOffset == NULL) {
pVgOffset = taosMemoryMalloc(sizeof(SMqVgOffsets));
if (pVgOffset == NULL) {
return -1;
}
pVgOffset->offsets = taosArrayInit(0, sizeof(void *));
taosArrayPush(pVgOffset->offsets, &pOffset);
}
taosHashPut(pHash, &pOffset->vgId, sizeof(int32_t), &pVgOffset, sizeof(void *));
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg);
if (pTrans == NULL) {
mError("mq-reset-offset: failed since %s", terrstr());
return -1;
}
return 0;
}
#endif
static
int32_t
mndProcessGetSubEpReq
(
SNodeMsg
*
pMsg
)
{
SMnode
*
pMnode
=
pMsg
->
pNode
;
SMqCMGetSubEpReq
*
pReq
=
(
SMqCMGetSubEpReq
*
)
pMsg
->
rpcMsg
.
pCont
;
...
...
@@ -574,251 +525,6 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
return
0
;
}
#if 0
for (int32_t j = 0; j < consumerNum; j++) {
bool changed = false;
bool unfished = false;
bool canUseLeft = imbalanceSolved < imbalanceVg;
bool mustUseLeft = canUseLeft && (imbalanceVg - imbalanceSolved >= consumerNum - j);
ASSERT(imbalanceVg - imbalanceSolved <= consumerNum - j);
int32_t maxVg = vgEachConsumer + canUseLeft;
int32_t minVg = vgEachConsumer + mustUseLeft;
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
int32_t vgThisConsumerAfterRb;
if (vgThisConsumerBeforeRb > maxVg) {
vgThisConsumerAfterRb = maxVg;
imbalanceSolved++;
changed = true;
} else if (vgThisConsumerBeforeRb < minVg) {
vgThisConsumerAfterRb = minVg;
if (mustUseLeft) imbalanceSolved++;
changed = true;
} else {
vgThisConsumerAfterRb = vgThisConsumerBeforeRb;
}
if (vgThisConsumerBeforeRb > vgThisConsumerAfterRb) {
while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
// put into unassigned
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
ASSERT(pConsumerEp != NULL);
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
taosArrayPush(unassignedVgStash, pConsumerEp);
}
} else if (vgThisConsumerBeforeRb < vgThisConsumerAfterRb) {
// assign from unassigned
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
// if no unassgined, save j
if (taosArrayGetSize(unassignedVgStash) == 0) {
taosArrayPush(unassignedConsumerIdx, &j);
unfished = true;
break;
}
// assign vg to consumer
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
ASSERT(pConsumerEp != NULL);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = pSubConsumer->consumerId;
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
// build msg and persist into trans
}
}
if (changed && !unfished) {
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
pRebConsumer->epoch++;
if (vgThisConsumerAfterRb != 0) {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
} else {
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
}
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw);
mndReleaseConsumer(pMnode, pRebConsumer);
// TODO: save history
}
}
for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumerIdx); j++) {
bool canUseLeft = imbalanceSolved < imbalanceVg;
int32_t consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumerIdx, j);
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx);
if (canUseLeft) imbalanceSolved++;
// must use
int32_t vgThisConsumerAfterRb = taosArrayGetSize(pSubConsumer->vgInfo) + canUseLeft;
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer + canUseLeft) {
// assign vg to consumer
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
ASSERT(pConsumerEp != NULL);
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
pConsumerEp->consumerId = pSubConsumer->consumerId;
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
// build msg and persist into trans
}
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
pRebConsumer->epoch++;
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pConsumerRaw);
mndReleaseConsumer(pMnode, pRebConsumer);
// TODO: save history
}
#endif
#if 0
//update consumer status for the subscribption
for (int32_t i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
int64_t consumerId = pCEp->consumerId;
if (pCEp->status != -1) {
int32_t consumerHbStatus = atomic_fetch_add_32(&pCEp->consumerHbStatus, 1);
if (consumerHbStatus < MND_SUBSCRIBE_REBALANCE_CNT) {
continue;
}
// put consumer into lostConsumer
SMqConsumerEp* lostConsumer = taosArrayPush(pSub->lostConsumer, pCEp);
lostConsumer->qmsg = NULL;
// put vg into unassigned
taosArrayPush(pSub->unassignedVg, pCEp);
// remove from assigned
// TODO: swap with last one, reduce size and reset i
taosArrayRemove(pSub->assigned, i);
// remove from available consumer
for (int32_t j = 0; j < taosArrayGetSize(pSub->availConsumer); j++) {
if (*(int64_t *)taosArrayGet(pSub->availConsumer, i) == pCEp->consumerId) {
taosArrayRemove(pSub->availConsumer, j);
break;
}
// TODO: acquire consumer, set status to unavail
}
#if 0
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++;
printf("current epoch %ld size %ld", pConsumer->epoch, pConsumer->topics->size);
SSdbRaw* pRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWriteNotFree(pMnode->pSdb, pRaw);
mndReleaseConsumer(pMnode, pConsumer);
#endif
}
}
// no available consumer, skip rebalance
if (taosArrayGetSize(pSub->availConsumer) == 0) {
continue;
}
taosArrayGet(pSub->availConsumer, 0);
// rebalance condition1 : have unassigned vg
// assign vg to a consumer, trying to find the least assigned one
if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) {
char *topic = NULL;
char *cgroup = NULL;
mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
for (int32_t i = 0; i < sz; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
pCEp->oldConsumerId = pCEp->consumerId;
pCEp->consumerId = consumerId;
taosArrayPush(pSub->assigned, pCEp);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++;
SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
sdbWrite(pMnode->pSdb, pConsumerRaw);
mndReleaseConsumer(pMnode, pConsumer);
void* msg;
int32_t msgLen;
mndBuildRebalanceMsg(&msg, &msgLen, pTopic, pCEp, cgroup, topic);
// persist msg
STransAction action = {0};
action.epSet = pCEp->epSet;
action.pCont = msg;
action.contLen = sizeof(SMsgHead) + msgLen;
action.msgType = TDMT_VND_MQ_SET_CONN;
mndTransAppendRedoAction(pTrans, &action);
// persist data
SSdbRaw *pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mndTransAppendRedolog(pTrans, pRaw);
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
}
mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans);
taosMemoryFreeClear(topic);
taosMemoryFreeClear(cgroup);
}
// rebalance condition2 : imbalance assignment
}
return 0;
}
#endif
#if 0
static int32_t mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub) {
SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL;
SQueryPlan *pPlan = qStringToQueryPlan(pTopic->physicalPlan);
SArray *pArray = NULL;
SNodeListNode *inner = (SNodeListNode*)nodesListGetNode(pPlan->pSubplans, 0);
SSubplan *plan = (SSubplan*)nodesListGetNode(inner->pNodeList, 0);
SArray *unassignedVg = pSub->unassignedVg;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
pSub->vgNum++;
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);
if (schedulerConvertDagToTaskList(pPlan, &pArray) < 0) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
return -1;
}
SMqConsumerEp consumerEp = {0};
consumerEp.status = 0;
consumerEp.consumerId = -1;
STaskInfo *pTaskInfo = taosArrayGet(pArray, 0);
consumerEp.epSet = pTaskInfo->addr.epset;
consumerEp.vgId = pTaskInfo->addr.nodeId;
ASSERT(consumerEp.vgId == pVgroup->vgId);
consumerEp.qmsg = strdup(pTaskInfo->msg->msg);
taosArrayPush(unassignedVg, &consumerEp);
// TODO: free taskInfo
taosArrayDestroy(pArray);
}
/*qDestroyQueryDag(pDag);*/
return 0;
}
#endif
static
int32_t
mndPersistMqSetConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqTopicObj
*
pTopic
,
const
char
*
cgroup
,
const
SMqConsumerEp
*
pConsumerEp
)
{
ASSERT
(
pConsumerEp
->
oldConsumerId
==
-
1
);
...
...
@@ -1140,7 +846,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
}
}
if
(
oldSub
)
taosArrayDestroyEx
(
oldSub
,
(
void
(
*
)(
void
*
))
taosMemoryFree
);
if
(
oldSub
)
taosArrayDestroyEx
(
oldSub
,
(
void
(
*
)(
void
*
))
taosMemoryFree
);
// persist consumerObj
SSdbRaw
*
pConsumerRaw
=
mndConsumerActionEncode
(
pConsumer
);
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
58352b76
...
...
@@ -65,9 +65,11 @@ void mndCleanupTopic(SMnode *pMnode) {}
SSdbRaw
*
mndTopicActionEncode
(
SMqTopicObj
*
pTopic
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
int32_t
logicalPlanLen
=
strlen
(
pTopic
->
logicalPlan
)
+
1
;
int32_t
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
int32_t
size
=
sizeof
(
SMqTopicObj
)
+
logicalPlanLen
+
physicalPlanLen
+
pTopic
->
sqlLen
+
MND_TOPIC_RESERVE_SIZE
;
int32_t
logicalPlanLen
=
strlen
(
pTopic
->
logicalPlan
)
+
1
;
int32_t
physicalPlanLen
=
strlen
(
pTopic
->
physicalPlan
)
+
1
;
int32_t
swLen
=
taosEncodeSSchemaWrapper
(
NULL
,
&
pTopic
->
schema
);
int32_t
size
=
sizeof
(
SMqTopicObj
)
+
logicalPlanLen
+
physicalPlanLen
+
pTopic
->
sqlLen
+
swLen
+
MND_TOPIC_RESERVE_SIZE
;
SSdbRaw
*
pRaw
=
sdbAllocRaw
(
SDB_TOPIC
,
MND_TOPIC_VER_NUMBER
,
size
);
if
(
pRaw
==
NULL
)
goto
TOPIC_ENCODE_OVER
;
...
...
@@ -86,8 +88,7 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
int32_t
swLen
=
taosEncodeSSchemaWrapper
(
NULL
,
&
pTopic
->
schema
);
void
*
swBuf
=
taosMemoryMalloc
(
swLen
);
void
*
swBuf
=
taosMemoryMalloc
(
swLen
);
if
(
swBuf
==
NULL
)
{
goto
TOPIC_ENCODE_OVER
;
}
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
58352b76
...
...
@@ -114,10 +114,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
SSchemaWrapper
*
pSchemaWrapper
=
pHandle
->
pSchemaWrapper
;
int32_t
numOfRows
=
pHandle
->
pBlock
->
numOfRows
;
int32_t
numOfCols
=
pHandle
->
pSchema
->
numOfCols
;
/*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
int32_t
colNumNeed
=
taosArrayGetSize
(
pHandle
->
pColIdList
);
// TODO: stable case
if
(
colNumNeed
>
pSchemaWrapper
->
nCols
)
{
colNumNeed
=
pSchemaWrapper
->
nCols
;
}
...
...
@@ -138,19 +137,24 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
colNeed
++
;
}
else
{
SColumnInfoData
colInfo
=
{
0
};
int
sz
=
numOfRows
*
pColSchema
->
bytes
;
/*int sz = numOfRows * pColSchema->bytes;*/
colInfo
.
info
.
bytes
=
pColSchema
->
bytes
;
colInfo
.
info
.
colId
=
pColSchema
->
colId
;
colInfo
.
info
.
type
=
pColSchema
->
type
;
#if 0
colInfo.pData = taosMemoryCalloc(1, sz);
if (colInfo.pData == NULL) {
// TODO free
taosArrayDestroy(pArray);
return NULL;
}
#endif
blockDataEnsureColumnCapacity
(
&
colInfo
,
numOfRows
);
if
(
blockDataEnsureColumnCapacity
(
&
colInfo
,
numOfRows
)
<
0
)
{
taosArrayDestroyEx
(
pArray
,
(
void
(
*
)(
void
*
))
tDeleteSSDataBlock
);
return
NULL
;
}
taosArrayPush
(
pArray
,
&
colInfo
);
colMeta
++
;
colNeed
++
;
...
...
source/libs/executor/src/executor.c
浏览文件 @
58352b76
...
...
@@ -14,12 +14,12 @@
*/
#include "executor.h"
#include "tdatablock.h"
#include "executorimpl.h"
#include "planner.h"
#include "tdatablock.h"
#include "vnode.h"
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
char
*
id
)
{
static
int32_t
doSetStreamBlock
(
SOperatorInfo
*
pOperator
,
void
*
input
,
size_t
numOfBlocks
,
int32_t
type
,
char
*
id
)
{
ASSERT
(
pOperator
!=
NULL
);
if
(
pOperator
->
operatorType
!=
QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN
)
{
if
(
pOperator
->
numOfDownstream
==
0
)
{
...
...
@@ -44,17 +44,18 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void** input, size_t n
}
if
(
type
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
if
(
tqReadHandleSetMsg
(
pInfo
->
readerHandle
,
input
[
0
]
,
0
)
<
0
)
{
if
(
tqReadHandleSetMsg
(
pInfo
->
readerHandle
,
input
,
0
)
<
0
)
{
qError
(
"submit msg messed up when initing stream block, %s"
PRIx64
,
id
);
return
TSDB_CODE_QRY_APP_ERROR
;
}
}
else
{
for
(
int32_t
i
=
0
;
i
<
numOfBlocks
;
++
i
)
{
SSDataBlock
*
pDataBlock
=
input
[
i
];
SSDataBlock
*
pDataBlock
=
&
((
SSDataBlock
*
)
input
)
[
i
];
SSDataBlock
*
p
=
createOneDataBlock
(
pDataBlock
);
p
->
info
=
pDataBlock
->
info
;
taosArrayClear
(
p
->
pDataBlock
);
taosArrayAddAll
(
p
->
pDataBlock
,
pDataBlock
->
pDataBlock
);
taosArrayPush
(
pInfo
->
pBlockLists
,
&
p
);
}
...
...
@@ -65,10 +66,10 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void** input, size_t n
}
int32_t
qSetStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
input
,
int32_t
type
)
{
qSetMultiStreamInput
(
tinfo
,
(
void
**
)
&
input
,
1
,
type
);
return
qSetMultiStreamInput
(
tinfo
,
input
,
1
,
type
);
}
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
void
*
*
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
)
{
int32_t
qSetMultiStreamInput
(
qTaskInfo_t
tinfo
,
const
void
*
pBlocks
,
size_t
numOfBlocks
,
int32_t
type
)
{
if
(
tinfo
==
NULL
)
{
return
TSDB_CODE_QRY_APP_ERROR
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
58352b76
此差异已折叠。
点击以展开。
source/libs/stream/src/tstream.c
浏览文件 @
58352b76
...
...
@@ -124,22 +124,23 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
}
}
else
if
(
inputType
==
STREAM_DATA_TYPE_SSDATA_BLOCK
)
{
const
SArray
*
blocks
=
(
const
SArray
*
)
input
;
int32_t
sz
=
taosArrayGetSize
(
blocks
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
blocks
,
i
);
qSetStreamInput
(
exec
,
pBlock
,
inputType
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
{
break
;
}
taosArrayPush
(
pRes
,
output
);
/*int32_t sz = taosArrayGetSize(blocks);*/
/*for (int32_t i = 0; i < sz; i++) {*/
/*SSDataBlock* pBlock = taosArrayGet(blocks, i);*/
/*qSetStreamInput(exec, pBlock, inputType);*/
qSetMultiStreamInput
(
exec
,
blocks
->
pData
,
blocks
->
size
,
STREAM_DATA_TYPE_SSDATA_BLOCK
);
while
(
1
)
{
SSDataBlock
*
output
;
uint64_t
ts
;
if
(
qExecTask
(
exec
,
&
output
,
&
ts
)
<
0
)
{
ASSERT
(
false
);
}
if
(
output
==
NULL
)
{
break
;
}
taosArrayPush
(
pRes
,
output
);
}
/*}*/
}
else
{
ASSERT
(
0
);
}
...
...
tests/script/tsim/tmq/basic1.sim
浏览文件 @
58352b76
...
...
@@ -149,16 +149,16 @@ endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success:
2
0}@ then
if $system_content != @{consume success:
1
0}@ then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 2
0}@ then
return -1
endi
#
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
#
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
#
print cmd result----> $system_content
#if $system_content != @{consume success: 1
0}@ then
#
return -1
#
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
...
...
@@ -167,12 +167,12 @@ if $system_content != @{consume success: 20}@ then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20}@ then
return -1
endi
#
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
#
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
#
print cmd result----> $system_content
#
if $system_content != @{consume success: 20}@ then
#
return -1
#
endi
print =============== create database , vgroup 4
$dbNamme = d1
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录