Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
6a7f526f
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
6a7f526f
编写于
4月 20, 2022
作者:
L
Liu Jicong
提交者:
GitHub
4月 20, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11693 from taosdata/feature/tq
refactor(tmq): subscribe and rebalance process
上级
29d994d1
558edd37
变更
28
展开全部
隐藏空白更改
内联
并排
Showing
28 changed file
with
2084 addition
and
463 deletion
+2084
-463
example/src/tmq.c
example/src/tmq.c
+2
-1
include/common/tmsg.h
include/common/tmsg.h
+129
-10
include/common/tmsgdef.h
include/common/tmsgdef.h
+2
-0
include/dnode/mnode/sdb/sdb.h
include/dnode/mnode/sdb/sdb.h
+7
-1
include/util/talgo.h
include/util/talgo.h
+5
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
include/util/tarray.h
include/util/tarray.h
+14
-14
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+32
-28
source/client/src/clientMain.c
source/client/src/clientMain.c
+16
-19
source/client/src/tmq.c
source/client/src/tmq.c
+73
-142
source/common/src/tmsg.c
source/common/src/tmsg.c
+2
-4
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
+1
-0
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+6
-5
source/dnode/mnode/impl/inc/mndConsumer.h
source/dnode/mnode/impl/inc/mndConsumer.h
+10
-7
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+132
-2
source/dnode/mnode/impl/inc/mndSubscribe.h
source/dnode/mnode/impl/inc/mndSubscribe.h
+3
-1
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+523
-14
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+397
-0
source/dnode/mnode/impl/src/mndScheduler.c
source/dnode/mnode/impl/src/mndScheduler.c
+30
-5
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+482
-24
source/dnode/mnode/impl/src/mnode.c
source/dnode/mnode/impl/src/mnode.c
+3
-3
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+26
-23
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+114
-133
source/dnode/vnode/src/tq/tqMetaStore.c
source/dnode/vnode/src/tq/tqMetaStore.c
+0
-1
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+9
-1
source/os/src/osFile.c
source/os/src/osFile.c
+20
-20
source/util/src/tarray.c
source/util/src/tarray.c
+43
-3
tests/script/tsim/tmq/basic1.sim
tests/script/tsim/tmq/basic1.sim
+2
-2
未找到文件。
example/src/tmq.c
浏览文件 @
6a7f526f
...
...
@@ -22,6 +22,7 @@
static
int
running
=
1
;
static
void
msg_process
(
TAOS_RES
*
msg
)
{
char
buf
[
1024
];
memset
(
buf
,
0
,
1024
);
printf
(
"topic: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"vg:%d
\n
"
,
tmq_get_vgroup_id
(
msg
));
while
(
1
)
{
...
...
@@ -220,7 +221,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
msg_process
(
tmqmessage
);
tmq_message_destroy
(
tmqmessage
);
if
((
++
msg_count
%
MIN_COMMIT_COUNT
)
==
0
)
tmq_commit
(
tmq
,
NULL
,
0
);
/*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/
}
}
...
...
include/common/tmsg.h
浏览文件 @
6a7f526f
...
...
@@ -324,7 +324,7 @@ typedef struct SEpSet {
int32_t
tEncodeSEpSet
(
SCoder
*
pEncoder
,
const
SEpSet
*
pEp
);
int32_t
tDecodeSEpSet
(
SCoder
*
pDecoder
,
SEpSet
*
pEp
);
int32_t
taosEncodeSEpSet
(
void
**
buf
,
const
SEpSet
*
pEp
);
void
*
taosDecodeSEpSet
(
void
*
buf
,
SEpSet
*
pEp
);
void
*
taosDecodeSEpSet
(
const
void
*
buf
,
SEpSet
*
pEp
);
typedef
struct
{
int8_t
connType
;
...
...
@@ -1291,10 +1291,14 @@ typedef struct {
int32_t
tSerializeSCMCreateTopicRsp
(
void
*
buf
,
int32_t
bufLen
,
const
SCMCreateTopicRsp
*
pRsp
);
int32_t
tDeserializeSCMCreateTopicRsp
(
void
*
buf
,
int32_t
bufLen
,
SCMCreateTopicRsp
*
pRsp
);
typedef
struct
{
int64_t
consumerId
;
}
SMqConsumerLostMsg
;
typedef
struct
{
int32_t
topicNum
;
int64_t
consumerId
;
char
*
consumerGroup
;
char
cgroup
[
TSDB_CGROUP_LEN
]
;
SArray
*
topicNames
;
// SArray<char*>
}
SCMSubscribeReq
;
...
...
@@ -1302,7 +1306,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
topicNum
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
c
onsumerG
roup
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
c
g
roup
);
for
(
int32_t
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
tlen
+=
taosEncodeString
(
buf
,
(
char
*
)
taosArrayGetP
(
pReq
->
topicNames
,
i
));
...
...
@@ -1313,7 +1317,7 @@ static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubsc
static
FORCE_INLINE
void
*
tDeserializeSCMSubscribeReq
(
void
*
buf
,
SCMSubscribeReq
*
pReq
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
topicNum
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
consumerId
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
consumerG
roup
);
buf
=
taosDecodeString
To
(
buf
,
pReq
->
cg
roup
);
pReq
->
topicNames
=
taosArrayInit
(
pReq
->
topicNum
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
pReq
->
topicNum
;
i
++
)
{
char
*
name
;
...
...
@@ -1389,10 +1393,10 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
}
typedef
struct
{
c
onst
char
*
key
;
SArray
*
lostConsumers
;
// SArray<int64_t>
SArray
*
removedConsumers
;
// SArray<int64_t>
SArray
*
newConsumers
;
// SArray<int64_t>
c
har
key
[
TSDB_SUBSCRIBE_KEY_LEN
]
;
SArray
*
lostConsumers
;
// SArray<int64_t>
SArray
*
removedConsumers
;
// SArray<int64_t>
SArray
*
newConsumers
;
// SArray<int64_t>
}
SMqRebSubscribe
;
static
FORCE_INLINE
SMqRebSubscribe
*
tNewSMqRebSubscribe
(
const
char
*
key
)
{
...
...
@@ -1400,7 +1404,7 @@ static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
if
(
pRebSub
==
NULL
)
{
goto
_err
;
}
pRebSub
->
key
=
strdup
(
key
);
strcpy
(
pRebSub
->
key
,
key
);
pRebSub
->
lostConsumers
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
if
(
pRebSub
->
lostConsumers
==
NULL
)
{
goto
_err
;
...
...
@@ -1425,6 +1429,7 @@ _err:
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization or
// deserialization
typedef
struct
{
int8_t
*
mqInReb
;
SHashObj
*
rebSubHash
;
// SHashObj<key, SMqRebSubscribe>
}
SMqDoRebalanceMsg
;
...
...
@@ -1927,6 +1932,40 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
return
buf
;
}
typedef
struct
{
int64_t
leftForVer
;
int32_t
vgId
;
int64_t
oldConsumerId
;
int64_t
newConsumerId
;
char
subKey
[
TSDB_SUBSCRIBE_KEY_LEN
];
char
*
qmsg
;
}
SMqRebVgReq
;
static
FORCE_INLINE
int32_t
tEncodeSMqRebVgReq
(
void
**
buf
,
const
SMqRebVgReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
leftForVer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
newConsumerId
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
subKey
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
qmsg
);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqRebVgReq
(
const
void
*
buf
,
SMqRebVgReq
*
pReq
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
leftForVer
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
newConsumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pReq
->
subKey
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
qmsg
);
return
(
void
*
)
buf
;
}
typedef
struct
{
int8_t
reserved
;
}
SMqRebVgRsp
;
typedef
struct
{
int64_t
leftForVer
;
int32_t
vgId
;
...
...
@@ -2397,6 +2436,7 @@ typedef struct {
int64_t
consumerId
;
}
SMqRspHead
;
#if 0
typedef struct {
SMsgHead head;
...
...
@@ -2410,6 +2450,17 @@ typedef struct {
uint64_t reqId;
char topic[TSDB_TOPIC_FNAME_LEN];
} SMqPollReq;
#endif
typedef
struct
{
SMsgHead
head
;
char
subKey
[
TSDB_SUBSCRIBE_KEY_LEN
];
int32_t
epoch
;
uint64_t
reqId
;
int64_t
consumerId
;
int64_t
blockingTime
;
int64_t
currentOffset
;
}
SMqPollReqV2
;
typedef
struct
{
int32_t
vgId
;
...
...
@@ -2483,13 +2534,81 @@ static FORCE_INLINE void* tDecodeSMqPollRspV2(const void* buf, SMqPollRspV2* pRs
return
(
void
*
)
buf
;
}
typedef
struct
{
SMqRspHead
head
;
int64_t
reqOffset
;
int64_t
rspOffset
;
int32_t
skipLogNum
;
int32_t
blockNum
;
int8_t
withTbName
;
int8_t
withSchema
;
int8_t
withTag
;
int8_t
withTagSchema
;
SArray
*
blockDataLen
;
// SArray<int32_t>
SArray
*
blockData
;
// SArray<SRetrieveTableRsp*>
SArray
*
blockTbName
;
// SArray<char*>
SArray
*
blockSchema
;
// SArray<SSchemaWrapper>
SArray
*
blockTags
;
// SArray<kvrow>
SArray
*
blockTagSchema
;
// SArray<kvrow>
}
SMqDataBlkRsp
;
static
FORCE_INLINE
int32_t
tEncodeSMqDataBlkRsp
(
void
**
buf
,
const
SMqDataBlkRsp
*
pRsp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
reqOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
rspOffset
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
skipLogNum
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
blockNum
);
if
(
pRsp
->
blockNum
!=
0
)
{
tlen
+=
taosEncodeFixedI8
(
buf
,
pRsp
->
withTbName
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pRsp
->
withSchema
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pRsp
->
withTag
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pRsp
->
withTagSchema
);
for
(
int32_t
i
=
0
;
i
<
pRsp
->
blockNum
;
i
++
)
{
int32_t
bLen
=
*
(
int32_t
*
)
taosArrayGet
(
pRsp
->
blockDataLen
,
i
);
void
*
data
=
taosArrayGetP
(
pRsp
->
blockData
,
i
);
tlen
+=
taosEncodeFixedI32
(
buf
,
bLen
);
tlen
+=
taosEncodeBinary
(
buf
,
data
,
bLen
);
}
}
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqDataBlkRsp
(
const
void
*
buf
,
SMqDataBlkRsp
*
pRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
reqOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
rspOffset
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
skipLogNum
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
blockNum
);
pRsp
->
blockData
=
taosArrayInit
(
pRsp
->
blockNum
,
sizeof
(
void
*
));
pRsp
->
blockDataLen
=
taosArrayInit
(
pRsp
->
blockNum
,
sizeof
(
void
*
));
if
(
pRsp
->
blockNum
!=
0
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
pRsp
->
withTbName
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pRsp
->
withSchema
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pRsp
->
withTag
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pRsp
->
withTagSchema
);
for
(
int32_t
i
=
0
;
i
<
pRsp
->
blockNum
;
i
++
)
{
int32_t
bLen
=
0
;
void
*
data
=
NULL
;
buf
=
taosDecodeFixedI32
(
buf
,
&
bLen
);
buf
=
taosDecodeBinary
(
buf
,
&
data
,
bLen
);
taosArrayPush
(
pRsp
->
blockDataLen
,
&
bLen
);
taosArrayPush
(
pRsp
->
blockData
,
&
data
);
}
}
return
(
void
*
)
buf
;
}
typedef
struct
{
SMqRspHead
head
;
char
cgroup
[
TSDB_CGROUP_LEN
];
SArray
*
topics
;
// SArray<SMqSubTopicEp>
}
SMqCMGetSubEpRsp
;
static
FORCE_INLINE
void
tDeleteSMqSubTopicEp
(
SMqSubTopicEp
*
pSubTopicEp
)
{
taosArrayDestroy
(
pSubTopicEp
->
vgs
);
}
static
FORCE_INLINE
void
tDeleteSMqSubTopicEp
(
SMqSubTopicEp
*
pSubTopicEp
)
{
// taosMemoryFree(pSubTopicEp->schema.pSchema);
taosArrayDestroy
(
pSubTopicEp
->
vgs
);
}
static
FORCE_INLINE
int32_t
tEncodeSMqSubVgEp
(
void
**
buf
,
const
SMqSubVgEp
*
pVgEp
)
{
int32_t
tlen
=
0
;
...
...
include/common/tmsgdef.h
浏览文件 @
6a7f526f
...
...
@@ -146,6 +146,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_MND_SUBSCRIBE
,
"mnode-subscribe"
,
SCMSubscribeReq
,
SCMSubscribeRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_GET_SUB_EP
,
"mnode-get-sub-ep"
,
SMqCMGetSubEpReq
,
SMqCMGetSubEpRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_TIMER
,
"mnode-mq-tmr"
,
SMTimerReq
,
SMTimerReq
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_CONSUMER_LOST
,
"mnode-mq-consumer-lost"
,
SMTimerReq
,
SMTimerReq
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_DO_REBALANCE
,
"mnode-mq-do-rebalance"
,
SMqDoRebalanceMsg
,
SMqDoRebalanceMsg
)
TD_DEF_MSG_TYPE
(
TDMT_MND_MQ_COMMIT_OFFSET
,
"mnode-mq-commit-offset"
,
SMqCMCommitOffsetReq
,
SMqCMCommitOffsetRsp
)
TD_DEF_MSG_TYPE
(
TDMT_MND_CREATE_STREAM
,
"mnode-create-stream"
,
SCMCreateStreamReq
,
SCMCreateStreamRsp
)
...
...
@@ -177,6 +178,7 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_SET_CONN
,
"vnode-mq-set-conn"
,
SMqSetCVgReq
,
SMqSetCVgRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_REB
,
"vnode-mq-mv-rebalance"
,
SMqMVRebReq
,
SMqMVRebRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_CANCEL_CONN
,
"vnode-mq-mv-cancel-conn"
,
SMqCancelConnReq
,
SMqCancelConnRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_VG_CHANGE
,
"vnode-mq-vg-change"
,
SMqRebVgReq
,
SMqRebVgRsp
)
TD_DEF_MSG_TYPE
(
TDMT_VND_MQ_SET_CUR
,
"vnode-mq-set-cur"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_RES_READY
,
"vnode-res-ready"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_VND_TASKS_STATUS
,
"vnode-tasks-status"
,
NULL
,
NULL
)
...
...
include/dnode/mnode/sdb/sdb.h
浏览文件 @
6a7f526f
...
...
@@ -92,7 +92,13 @@ extern "C" {
typedef
struct
SMnode
SMnode
;
typedef
struct
SSdbRaw
SSdbRaw
;
typedef
struct
SSdbRow
SSdbRow
;
typedef
enum
{
SDB_KEY_BINARY
=
1
,
SDB_KEY_INT32
=
2
,
SDB_KEY_INT64
=
3
}
EKeyType
;
typedef
enum
{
SDB_KEY_BINARY
=
1
,
SDB_KEY_INT32
=
2
,
SDB_KEY_INT64
=
3
,
}
EKeyType
;
typedef
enum
{
SDB_STATUS_INIT
=
0
,
SDB_STATUS_CREATING
=
1
,
...
...
include/util/talgo.h
浏览文件 @
6a7f526f
...
...
@@ -27,6 +27,11 @@ extern "C" {
typedef
int32_t
(
*
__compar_fn_t
)(
const
void
*
,
const
void
*
);
#endif
typedef
void
*
(
*
FCopy
)(
void
*
);
typedef
void
(
*
FDelete
)(
void
*
);
typedef
int32_t
(
*
FEncode
)(
void
**
buf
,
const
void
*
dst
);
typedef
void
*
(
*
FDecode
)(
const
void
*
buf
,
void
*
dst
);
#define TD_EQ 0x1
#define TD_GT 0x2
#define TD_LT 0x4
...
...
include/util/taoserror.h
浏览文件 @
6a7f526f
...
...
@@ -279,6 +279,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_UNSUPPORTED_TOPIC TAOS_DEF_ERROR_CODE(0, 0x03E8)
#define TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E9)
#define TSDB_CODE_MND_OFFSET_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03EA)
#define TSDB_CODE_MND_CONSUMER_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x03EB)
// mnode-stream
#define TSDB_CODE_MND_STREAM_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03F0)
...
...
include/util/tarray.h
浏览文件 @
6a7f526f
...
...
@@ -41,10 +41,10 @@ extern "C" {
#define TARRAY_GET_START(array) ((array)->pData)
typedef
struct
SArray
{
size_t
size
;
size_t
size
;
uint32_t
capacity
;
uint32_t
elemSize
;
void
*
pData
;
void
*
pData
;
}
SArray
;
/**
...
...
@@ -199,6 +199,13 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize);
*/
SArray
*
taosArrayDup
(
const
SArray
*
pSrc
);
/**
* deep copy a new array
* @param pSrc
*/
SArray
*
taosArrayDeepCopy
(
const
SArray
*
pSrc
,
FCopy
deepCopy
);
/**
* clear the array (remove all element)
* @param pArray
...
...
@@ -212,19 +219,9 @@ void taosArrayClear(SArray* pArray);
*/
void
taosArrayClearEx
(
SArray
*
pArray
,
void
(
*
fp
)(
void
*
));
/**
* destroy array list
* @param pArray
*/
void
*
taosArrayDestroy
(
SArray
*
pArray
);
/**
*
* @param pArray
* @param fp
*/
void
taosArrayDestroyEx
(
SArray
*
pArray
,
void
(
*
fp
)(
void
*
));
void
taosArrayDestroyP
(
SArray
*
pArray
,
FDelete
fp
);
void
taosArrayDestroyEx
(
SArray
*
pArray
,
FDelete
fp
);
/**
* sort the array
...
...
@@ -272,6 +269,9 @@ char* taosArraySearchString(const SArray* pArray, const char* key, __compar_fn_t
void
taosArraySortPWithExt
(
SArray
*
pArray
,
__ext_compar_fn_t
fn
,
const
void
*
param
);
int32_t
taosEncodeArray
(
void
**
buf
,
const
SArray
*
pArray
,
FEncode
encode
);
void
*
taosDecodeArray
(
const
void
*
buf
,
SArray
**
pArray
,
FDecode
decode
,
int32_t
dataSz
);
#ifdef __cplusplus
}
#endif
...
...
source/client/inc/clientInt.h
浏览文件 @
6a7f526f
...
...
@@ -44,7 +44,7 @@ extern "C" {
} while (0)
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
#define HEARTBEAT_INTERVAL 1500 // ms
#define HEARTBEAT_INTERVAL
1500 // ms
enum
{
RES_TYPE__QUERY
=
1
,
...
...
@@ -187,11 +187,13 @@ typedef struct SRequestSendRecvBody {
}
SRequestSendRecvBody
;
typedef
struct
{
int8_t
resType
;
char
*
topic
;
SArray
*
res
;
// SArray<SReqResultInfo>
int32_t
resIter
;
int32_t
vgId
;
int8_t
resType
;
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
int32_t
vgId
;
SSchemaWrapper
schema
;
int32_t
resIter
;
SMqDataBlkRsp
rsp
;
SReqResultInfo
resInfo
;
}
SMqRspObj
;
typedef
struct
SRequestObj
{
...
...
@@ -211,16 +213,24 @@ typedef struct SRequestObj {
SRequestSendRecvBody
body
;
}
SRequestObj
;
void
*
doFetchRows
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
);
void
doSetOneRowPtr
(
SReqResultInfo
*
pResultInfo
);
void
setResPrecision
(
SReqResultInfo
*
pResInfo
,
int32_t
precision
);
int32_t
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
,
bool
convertUcs4
);
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
static
FORCE_INLINE
SReqResultInfo
*
tmqGetCurResInfo
(
TAOS_RES
*
res
)
{
SMqRspObj
*
msg
=
(
SMqRspObj
*
)
res
;
int32_t
resIter
=
msg
->
resIter
==
-
1
?
0
:
msg
->
resIter
;
return
(
SReqResultInfo
*
)
taosArrayGet
(
msg
->
res
,
resIter
);
return
(
SReqResultInfo
*
)
&
msg
->
resInfo
;
}
static
FORCE_INLINE
SReqResultInfo
*
tmqGetNextResInfo
(
TAOS_RES
*
res
)
{
static
FORCE_INLINE
SReqResultInfo
*
tmqGetNextResInfo
(
TAOS_RES
*
res
,
bool
convertUcs4
)
{
SMqRspObj
*
msg
=
(
SMqRspObj
*
)
res
;
if
(
++
msg
->
resIter
<
taosArrayGetSize
(
msg
->
res
))
{
return
(
SReqResultInfo
*
)
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
msg
->
resIter
++
;
if
(
msg
->
resIter
<
msg
->
rsp
.
blockNum
)
{
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
taosArrayGetP
(
msg
->
rsp
.
blockData
,
msg
->
resIter
);
setQueryResultFromRsp
(
&
msg
->
resInfo
,
pRetrieve
,
convertUcs4
);
return
&
msg
->
resInfo
;
}
return
NULL
;
}
...
...
@@ -238,25 +248,25 @@ extern int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t
int
genericRspCallback
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
);
SMsgSendInfo
*
buildMsgInfoImpl
(
SRequestObj
*
pReqObj
);
int
taos_init
();
int
taos_init
();
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
SAppInstInfo
*
pAppInfo
);
void
destroyTscObj
(
void
*
pObj
);
STscObj
*
acquireTscObj
(
int64_t
rid
);
int32_t
releaseTscObj
(
int64_t
rid
);
void
*
createTscObj
(
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
SAppInstInfo
*
pAppInfo
);
void
destroyTscObj
(
void
*
pObj
);
STscObj
*
acquireTscObj
(
int64_t
rid
);
int32_t
releaseTscObj
(
int64_t
rid
);
uint64_t
generateRequestId
();
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
);
void
destroyRequest
(
SRequestObj
*
pRequest
);
SRequestObj
*
acquireRequest
(
int64_t
rid
);
int32_t
releaseRequest
(
int64_t
rid
);
void
*
createRequest
(
STscObj
*
pObj
,
__taos_async_fn_t
fp
,
void
*
param
,
int32_t
type
);
void
destroyRequest
(
SRequestObj
*
pRequest
);
SRequestObj
*
acquireRequest
(
int64_t
rid
);
int32_t
releaseRequest
(
int64_t
rid
);
char
*
getDbOfConnection
(
STscObj
*
pObj
);
void
setConnectionDB
(
STscObj
*
pTscObj
,
const
char
*
db
);
void
resetConnectDB
(
STscObj
*
pTscObj
);
int
taos_options_imp
(
TSDB_OPTION
option
,
const
char
*
str
);
int
taos_options_imp
(
TSDB_OPTION
option
,
const
char
*
str
);
void
*
openTransporter
(
const
char
*
user
,
const
char
*
auth
,
int32_t
numOfThreads
);
...
...
@@ -273,12 +283,6 @@ int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArra
int32_t
buildRequest
(
STscObj
*
pTscObj
,
const
char
*
sql
,
int
sqlLen
,
SRequestObj
**
pRequest
);
void
*
doFetchRows
(
SRequestObj
*
pRequest
,
bool
setupOneRowPtr
,
bool
convertUcs4
);
void
doSetOneRowPtr
(
SReqResultInfo
*
pResultInfo
);
void
setResSchemaInfo
(
SReqResultInfo
*
pResInfo
,
const
SSchema
*
pSchema
,
int32_t
numOfCols
);
void
setResPrecision
(
SReqResultInfo
*
pResInfo
,
int32_t
precision
);
int32_t
setQueryResultFromRsp
(
SReqResultInfo
*
pResultInfo
,
const
SRetrieveTableRsp
*
pRsp
,
bool
convertUcs4
);
// --- heartbeat
// global, called by mgmt
int
hbMgrInit
();
...
...
@@ -290,7 +294,7 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char* key);
void
appHbMgrCleanup
(
void
);
// conn level
int
hbRegisterConn
(
SAppHbMgr
*
pAppHbMgr
,
int64_t
tscRefId
,
int64_t
clusterId
,
int8_t
connType
);
int
hbRegisterConn
(
SAppHbMgr
*
pAppHbMgr
,
int64_t
tscRefId
,
int64_t
clusterId
,
int8_t
connType
);
void
hbDeregisterConn
(
SAppHbMgr
*
pAppHbMgr
,
SClientHbKey
connKey
);
int
hbAddConnInfo
(
SAppHbMgr
*
pAppHbMgr
,
SClientHbKey
connKey
,
void
*
key
,
void
*
value
,
int32_t
keyLen
,
int32_t
valueLen
);
...
...
source/client/src/clientMain.c
浏览文件 @
6a7f526f
...
...
@@ -14,12 +14,12 @@
*/
#include "catalog.h"
#include "scheduler.h"
#include "clientInt.h"
#include "clientStmt.h"
#include "clientLog.h"
#include "clientStmt.h"
#include "os.h"
#include "query.h"
#include "scheduler.h"
#include "tglobal.h"
#include "tmsg.h"
#include "tref.h"
...
...
@@ -177,25 +177,24 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return
doFetchRows
(
pRequest
,
true
,
true
);
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
if
(
msg
->
resIter
==
-
1
)
msg
->
resIter
++
;
SReqResultInfo
*
pResultInfo
=
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
SReqResultInfo
*
pResultInfo
;
if
(
msg
->
resIter
==
-
1
)
{
pResultInfo
=
tmqGetNextResInfo
(
res
,
true
);
}
else
{
pResultInfo
=
tmqGetCurResInfo
(
res
);
}
if
(
pResultInfo
->
current
<
pResultInfo
->
numOfRows
)
{
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
return
pResultInfo
->
row
;
}
else
{
msg
->
resIter
++
;
if
(
msg
->
resIter
<
taosArrayGetSize
(
msg
->
res
))
{
pResultInfo
=
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
return
pResultInfo
->
row
;
}
else
{
return
NULL
;
}
pResultInfo
=
tmqGetNextResInfo
(
res
,
true
);
if
(
pResultInfo
==
NULL
)
return
NULL
;
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
return
pResultInfo
->
row
;
}
}
else
{
// assert to avoid un-initialization error
ASSERT
(
0
);
...
...
@@ -455,7 +454,7 @@ int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
(
*
numOfRows
)
=
pResultInfo
->
numOfRows
;
return
pRequest
->
code
;
}
else
if
(
TD_RES_TMQ
(
res
))
{
SReqResultInfo
*
pResultInfo
=
tmqGetNextResInfo
(
res
);
SReqResultInfo
*
pResultInfo
=
tmqGetNextResInfo
(
res
,
true
);
if
(
pResultInfo
==
NULL
)
return
-
1
;
pResultInfo
->
current
=
pResultInfo
->
numOfRows
;
...
...
@@ -474,7 +473,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
}
if
(
TD_RES_TMQ
(
res
))
{
SReqResultInfo
*
pResultInfo
=
tmqGetNextResInfo
(
res
);
SReqResultInfo
*
pResultInfo
=
tmqGetNextResInfo
(
res
,
false
);
if
(
pResultInfo
==
NULL
)
{
(
*
numOfRows
)
=
0
;
return
0
;
...
...
@@ -710,10 +709,8 @@ int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
return
stmtBindBatch
(
stmt
,
bind
);
}
TAOS_RES
*
taos_schemaless_insert
(
TAOS
*
taos
,
char
*
lines
[],
int
numLines
,
int
protocol
,
int
precision
)
{
// TODO
return
NULL
;
}
source/client/src/tmq.c
浏览文件 @
6a7f526f
...
...
@@ -72,25 +72,25 @@ struct tmq_conf_t {
struct
tmq_t
{
// conf
char
groupId
[
TSDB_CGROUP_LEN
];
char
clientId
[
256
];
int8_t
autoCommit
;
int8_t
inWaiting
;
char
groupId
[
TSDB_CGROUP_LEN
];
char
clientId
[
256
];
int8_t
autoCommit
;
/*int8_t inWaiting;*/
int64_t
consumerId
;
int32_t
epoch
;
int32_t
resetOffsetCfg
;
int64_t
status
;
STscObj
*
pTscObj
;
tmq_commit_cb
*
commit_cb
;
int32_t
nextTopicIdx
;
int8_t
epStatus
;
int32_t
epSkipCnt
;
int32_t
waitingRequest
;
int32_t
readyRequest
;
SArray
*
clientTopics
;
// SArray<SMqClientTopic>
STaosQueue
*
mqueue
;
// queue of tmq_message_t
STaosQall
*
qall
;
tsem_t
rspSem
;
/*int32_t nextTopicIdx;*/
int8_t
epStatus
;
int32_t
epSkipCnt
;
/*int32_t waitingRequest;*/
/*int32_t readyRequest;*/
SArray
*
clientTopics
;
// SArray<SMqClientTopic>
STaosQueue
*
mqueue
;
// queue of tmq_message_t
STaosQall
*
qall
;
tsem_t
rspSem
;
// stat
int64_t
pollCnt
;
};
...
...
@@ -134,7 +134,7 @@ typedef struct {
int32_t
epoch
;
SMqClientVg
*
vgHandle
;
SMqClientTopic
*
topicHandle
;
SMq
PollRspV2
msg
;
SMq
DataBlkRsp
msg
;
}
SMqPollRspWrapper
;
typedef
struct
{
...
...
@@ -145,6 +145,7 @@ typedef struct {
typedef
struct
{
tmq_t
*
tmq
;
int32_t
code
;
int32_t
sync
;
tsem_t
rspSem
;
}
SMqAskEpCbParam
;
...
...
@@ -255,7 +256,12 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
void
tmq_list_destroy
(
tmq_list_t
*
list
)
{
SArray
*
container
=
&
list
->
container
;
/*taosArrayDestroy(container);*/
taosArrayDestroyEx
(
container
,
(
void
(
*
)(
void
*
))
taosMemoryFree
);
int32_t
sz
=
taosArrayGetSize
(
container
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
str
=
taosArrayGetP
(
container
,
i
);
taosMemoryFree
(
str
);
}
taosArrayDestroy
(
container
);
}
static
int32_t
tmqMakeTopicVgKey
(
char
*
dst
,
const
char
*
topicName
,
int32_t
vg
)
{
...
...
@@ -322,12 +328,12 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
return
NULL
;
}
pTmq
->
pTscObj
=
(
STscObj
*
)
conn
;
pTmq
->
inWaiting
=
0
;
/*pTmq->inWaiting = 0;*/
pTmq
->
status
=
0
;
pTmq
->
pollCnt
=
0
;
pTmq
->
epoch
=
0
;
pTmq
->
waitingRequest
=
0
;
pTmq
->
readyRequest
=
0
;
/*pTmq->waitingRequest = 0;*/
/*pTmq->readyRequest = 0;*/
pTmq
->
epStatus
=
0
;
pTmq
->
epSkipCnt
=
0
;
// set conf
...
...
@@ -367,12 +373,12 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq
->
pTscObj
=
taos_connect_internal
(
conf
->
ip
,
user
,
pass
,
NULL
,
conf
->
db
,
conf
->
port
,
CONN_TYPE__TMQ
);
if
(
pTmq
->
pTscObj
==
NULL
)
return
NULL
;
pTmq
->
inWaiting
=
0
;
/*pTmq->inWaiting = 0;*/
pTmq
->
status
=
0
;
pTmq
->
pollCnt
=
0
;
pTmq
->
epoch
=
0
;
pTmq
->
waitingRequest
=
0
;
pTmq
->
readyRequest
=
0
;
/*pTmq->waitingRequest = 0;*/
/*pTmq->readyRequest = 0;*/
pTmq
->
epStatus
=
0
;
pTmq
->
epSkipCnt
=
0
;
// set conf
...
...
@@ -496,7 +502,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SCMSubscribeReq
req
;
req
.
topicNum
=
sz
;
req
.
consumerId
=
tmq
->
consumerId
;
req
.
consumerGroup
=
strdup
(
tmq
->
groupId
);
strcpy
(
req
.
cgroup
,
tmq
->
groupId
);
req
.
topicNames
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -857,7 +863,6 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
#endif
int32_t
tmqPollCb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
/*printf("recv poll\n");*/
SMqPollCbParam
*
pParam
=
(
SMqPollCbParam
*
)
param
;
SMqClientVg
*
pVg
=
pParam
->
pVg
;
SMqClientTopic
*
pTopic
=
pParam
->
pTopic
;
...
...
@@ -870,17 +875,15 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t
msgEpoch
=
((
SMqRspHead
*
)
pMsg
->
pData
)
->
epoch
;
int32_t
tmqEpoch
=
atomic_load_32
(
&
tmq
->
epoch
);
if
(
msgEpoch
<
tmqEpoch
)
{
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
/*tsem_post(&tmq->rspSem);*/
// do not write into queue since updating epoch reset
tscWarn
(
"msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
/*tsem_post(&tmq->rspSem);*/
return
0
;
}
if
(
msgEpoch
!=
tmqEpoch
)
{
tscWarn
(
"mismatch rsp from vg %d, epoch %d, current epoch %d"
,
pParam
->
vgId
,
msgEpoch
,
tmqEpoch
);
}
else
{
atomic_sub_fetch_32
(
&
tmq
->
waitingRequest
,
1
);
}
#if 0
...
...
@@ -902,45 +905,33 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
#endif
/*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/
/*tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));*/
SMqPollRspWrapper
*
pRspWrapper
=
taosAllocateQitem
(
sizeof
(
SMqPollRspWrapper
));
if
(
pRspWrapper
==
NULL
)
{
tscWarn
(
"msg discard from vg %d, epoch %d since out of memory"
,
pParam
->
vgId
,
pParam
->
epoch
);
goto
CREATE_MSG_FAIL
;
}
pRspWrapper
->
tmqRspType
=
TMQ_MSG_TYPE__POLL_RSP
;
pRspWrapper
->
vgHandle
=
pVg
;
pRspWrapper
->
topicHandle
=
pTopic
;
/*memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));*/
memcpy
(
&
pRspWrapper
->
msg
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqPollRspV2
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRspWrapper
->
msg
);
// TODO: alloc mem
/*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
#if 0
if (pRsp->msg.numOfTopics == 0) {
/*printf("no data\n");*/
taosFreeQitem(pRsp);
goto CREATE_MSG_FAIL;
}
#endif
tDecodeSMqDataBlkRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRspWrapper
->
msg
);
tscDebug
(
"consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld"
,
tmq
->
consumerId
,
pVg
->
vgId
,
pRspWrapper
->
msg
.
reqOffset
,
pRspWrapper
->
msg
.
rspOffset
);
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
atomic_add_fetch_32
(
&
tmq
->
readyRequest
,
1
);
/*tsem_post(&tmq->rspSem);*/
return
0
;
return
0
;
CREATE_MSG_FAIL:
if
(
pParam
->
epoch
==
tmq
->
epoch
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
}
/*tsem_post(&tmq->rspSem);*/
return
code
;
return
-
1
;
}
bool
tmqUpdateEp
(
tmq_t
*
tmq
,
int32_t
epoch
,
SMqCMGetSubEpRsp
*
pRsp
)
{
...
...
@@ -1023,6 +1014,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
int32_t
tmqAskEpCb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqAskEpCbParam
*
pParam
=
(
SMqAskEpCbParam
*
)
param
;
tmq_t
*
tmq
=
pParam
->
tmq
;
pParam
->
code
=
code
;
if
(
code
!=
0
)
{
tscError
(
"consumer %ld get topic endpoint error, not ready, wait:%d"
,
tmq
->
consumerId
,
pParam
->
sync
);
goto
END
;
...
...
@@ -1062,6 +1054,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
taosWriteQitem
(
tmq
->
mqueue
,
pWrapper
);
/*tsem_post(&tmq->rspSem);*/
taosMemoryFree
(
pParam
);
}
END:
...
...
@@ -1073,7 +1066,8 @@ END:
}
int32_t
tmqAskEp
(
tmq_t
*
tmq
,
bool
sync
)
{
int8_t
epStatus
=
atomic_val_compare_exchange_8
(
&
tmq
->
epStatus
,
0
,
1
);
int32_t
code
=
0
;
int8_t
epStatus
=
atomic_val_compare_exchange_8
(
&
tmq
->
epStatus
,
0
,
1
);
if
(
epStatus
==
1
)
{
int32_t
epSkipCnt
=
atomic_add_fetch_32
(
&
tmq
->
epSkipCnt
,
1
);
tscTrace
(
"consumer %ld skip ask ep cnt %d"
,
tmq
->
consumerId
,
epSkipCnt
);
...
...
@@ -1130,8 +1124,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
if
(
sync
)
tsem_wait
(
&
pParam
->
rspSem
);
return
0
;
if
(
sync
)
{
tsem_wait
(
&
pParam
->
rspSem
);
code
=
pParam
->
code
;
taosMemoryFree
(
pParam
);
}
return
code
;
}
tmq_resp_err_t
tmq_seek
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_t
*
offset
)
{
...
...
@@ -1157,7 +1155,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) {
return
TMQ_RESP_ERR__FAIL
;
}
SMqPollReq
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
blockingTime
,
SMqClientTopic
*
pTopic
,
SMqClientVg
*
pVg
)
{
SMqPollReq
V2
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
blockingTime
,
SMqClientTopic
*
pTopic
,
SMqClientVg
*
pVg
)
{
int64_t
reqOffset
;
if
(
pVg
->
currentOffset
>=
0
)
{
reqOffset
=
pVg
->
currentOffset
;
...
...
@@ -1169,13 +1167,18 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
reqOffset
=
tmq
->
resetOffsetCfg
;
}
SMqPollReq
*
pReq
=
taosMemoryMalloc
(
sizeof
(
SMqPollReq
));
SMqPollReq
V2
*
pReq
=
taosMemoryMalloc
(
sizeof
(
SMqPollReqV2
));
if
(
pReq
==
NULL
)
{
return
NULL
;
}
strcpy
(
pReq
->
topic
,
pTopic
->
topicName
);
strcpy
(
pReq
->
cgroup
,
tmq
->
groupId
);
/*strcpy(pReq->topic, pTopic->topicName);*/
/*strcpy(pReq->cgroup, tmq->groupId);*/
int32_t
tlen
=
strlen
(
tmq
->
groupId
);
memcpy
(
pReq
->
subKey
,
tmq
->
groupId
,
tlen
);
pReq
->
subKey
[
tlen
]
=
TMQ_SEPARATOR
;
strcpy
(
pReq
->
subKey
+
tlen
+
1
,
pTopic
->
topicName
);
pReq
->
blockingTime
=
blockingTime
;
pReq
->
consumerId
=
tmq
->
consumerId
;
...
...
@@ -1184,101 +1187,26 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
pReq
->
reqId
=
generateRequestId
();
pReq
->
head
.
vgId
=
htonl
(
pVg
->
vgId
);
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqPollReq
));
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqPollReq
V2
));
return
pReq
;
}
SMqRspObj
*
tmqBuildRspFromWrapper
(
SMqPollRspWrapper
*
pWrapper
)
{
SMqRspObj
*
pRspObj
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqRspObj
));
pRspObj
->
resType
=
RES_TYPE__TMQ
;
pRspObj
->
topic
=
strdup
(
pWrapper
->
topicHandle
->
topicName
);
pRspObj
->
resIter
=
-
1
;
strncpy
(
pRspObj
->
topic
,
pWrapper
->
topicHandle
->
topicName
,
TSDB_TOPIC_FNAME_LEN
);
pRspObj
->
vgId
=
pWrapper
->
vgHandle
->
vgId
;
SMqPollRspV2
*
pRsp
=
&
pWrapper
->
msg
;
int32_t
blockNum
=
taosArrayGetSize
(
pRsp
->
blockPos
);
pRspObj
->
res
=
taosArrayInit
(
blockNum
,
sizeof
(
SReqResultInfo
));
for
(
int32_t
i
=
0
;
i
<
blockNum
;
i
++
)
{
int32_t
pos
=
*
(
int32_t
*
)
taosArrayGet
(
pRsp
->
blockPos
,
i
);
SRetrieveTableRsp
*
pRetrieve
=
POINTER_SHIFT
(
pRsp
->
blockData
,
pos
);
SReqResultInfo
resInfo
=
{
0
};
resInfo
.
totalRows
=
0
;
resInfo
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
setResSchemaInfo
(
&
resInfo
,
pWrapper
->
topicHandle
->
schema
.
pSchema
,
pWrapper
->
topicHandle
->
schema
.
nCols
);
setQueryResultFromRsp
(
&
resInfo
,
pRetrieve
,
true
);
taosArrayPush
(
pRspObj
->
res
,
&
resInfo
);
}
return
pRspObj
;
}
#if 0
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
tmq_message_t* msg = NULL;
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
/*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/
/*continue;*/
/*}*/
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
if (pReq == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
return NULL;
}
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
// TODO: out of mem
return NULL;
}
pParam->tmq = tmq;
pParam->pVg = pVg;
pParam->epoch = tmq->epoch;
pParam->sync = 1;
pParam->msg = &msg;
tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
return NULL;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqPollReq),
.handle = NULL,
};
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_CONSUME;
pRspObj
->
resIter
=
-
1
;
memcpy
(
&
pRspObj
->
rsp
,
&
pWrapper
->
msg
,
sizeof
(
SMqDataBlkRsp
));
int64_t transporterId = 0;
/*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
tmq->pollCnt++;
/*SRetrieveTableRsp* pRetrieve = taosArrayGetP(pWrapper->msg.blockData, 0);*/
pRspObj
->
resInfo
.
totalRows
=
0
;
pRspObj
->
resInfo
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
setResSchemaInfo
(
&
pRspObj
->
resInfo
,
pWrapper
->
topicHandle
->
schema
.
pSchema
,
pWrapper
->
topicHandle
->
schema
.
nCols
);
tsem_wait(&pParam->rspSem);
tmq_message_t* nmsg = NULL;
while (1) {
taosReadQitem(tmq->mqueue, (void**)&nmsg);
if (nmsg == NULL) continue;
while (nmsg->head.mqMsgType != TMQ_MSG_TYPE__POLL_RSP) {
taosReadQitem(tmq->mqueue, (void**)&nmsg);
}
return nmsg;
}
}
}
return NULL;
taosFreeQitem
(
pWrapper
);
return
pRspObj
;
}
#endif
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
blockingTime
)
{
/*printf("call poll\n");*/
...
...
@@ -1301,7 +1229,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
#endif
}
atomic_store_32
(
&
pVg
->
vgSkipCnt
,
0
);
SMqPollReq
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blockingTime
,
pTopic
,
pVg
);
SMqPollReq
V2
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blockingTime
,
pTopic
,
pVg
);
if
(
pReq
==
NULL
)
{
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
/*tsem_post(&tmq->rspSem);*/
...
...
@@ -1332,7 +1260,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
sendInfo
->
msgInfo
=
(
SDataBuf
){
.
pData
=
pReq
,
.
len
=
sizeof
(
SMqPollReq
),
.
len
=
sizeof
(
SMqPollReq
V2
),
.
handle
=
NULL
,
};
sendInfo
->
requestId
=
pReq
->
reqId
;
...
...
@@ -1343,7 +1271,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t
transporterId
=
0
;
/*printf("send poll\n");*/
atomic_add_fetch_32
(
&
tmq
->
waitingRequest
,
1
);
/*atomic_add_fetch_32(&tmq->waitingRequest, 1);*/
tscDebug
(
"consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
tmq
->
epoch
,
pVg
->
currentOffset
,
pReq
->
reqId
);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
...
...
@@ -1385,7 +1313,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
if
(
rspWrapper
->
tmqRspType
==
TMQ_MSG_TYPE__POLL_RSP
)
{
SMqPollRspWrapper
*
pollRspWrapper
=
(
SMqPollRspWrapper
*
)
rspWrapper
;
atomic_sub_fetch_32
(
&
tmq
->
readyRequest
,
1
);
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
if
(
pollRspWrapper
->
msg
.
head
.
epoch
==
atomic_load_32
(
&
tmq
->
epoch
))
{
/*printf("epoch match\n");*/
...
...
@@ -1393,7 +1321,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg
->
currentOffset
=
pollRspWrapper
->
msg
.
rspOffset
;
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
if
(
pollRspWrapper
->
msg
.
dataLen
==
0
)
{
if
(
pollRspWrapper
->
msg
.
blockNum
==
0
)
{
taosFreeQitem
(
pollRspWrapper
);
rspWrapper
=
NULL
;
continue
;
...
...
@@ -1449,7 +1377,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
// TODO: put into another thread or delayed queue
int64_t
status
=
atomic_load_64
(
&
tmq
->
status
);
tmqAskEp
(
tmq
,
status
==
TMQ_CONSUMER_STATUS__INIT
);
while
(
0
!=
tmqAskEp
(
tmq
,
status
==
TMQ_CONSUMER_STATUS__INIT
))
{
tscDebug
(
"not ready, retry
\n
"
);
taosSsleep
(
1
);
}
rspObj
=
tmqHandleAllRsp
(
tmq
,
blocking_time
,
false
);
if
(
rspObj
)
{
...
...
source/common/src/tmsg.c
浏览文件 @
6a7f526f
...
...
@@ -125,14 +125,14 @@ int32_t taosEncodeSEpSet(void **buf, const SEpSet *pEp) {
return
tlen
;
}
void
*
taosDecodeSEpSet
(
void
*
buf
,
SEpSet
*
pEp
)
{
void
*
taosDecodeSEpSet
(
const
void
*
buf
,
SEpSet
*
pEp
)
{
buf
=
taosDecodeFixedI8
(
buf
,
&
pEp
->
inUse
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pEp
->
numOfEps
);
for
(
int32_t
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
i
++
)
{
buf
=
taosDecodeFixedU16
(
buf
,
&
pEp
->
eps
[
i
].
port
);
buf
=
taosDecodeStringTo
(
buf
,
pEp
->
eps
[
i
].
fqdn
);
}
return
buf
;
return
(
void
*
)
buf
;
}
static
int32_t
tSerializeSClientHbReq
(
SCoder
*
pEncoder
,
const
SClientHbReq
*
pReq
)
{
...
...
@@ -3599,8 +3599,6 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if
(
tEncodeI8
(
&
encoder
,
pReq
->
igExists
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
sqlLen
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
&
encoder
,
astLen
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
&
encoder
,
pReq
->
triggerType
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
&
encoder
,
pReq
->
watermark
)
<
0
)
return
-
1
;
if
(
sqlLen
>
0
&&
tEncodeCStr
(
&
encoder
,
pReq
->
sql
)
<
0
)
return
-
1
;
if
(
astLen
>
0
&&
tEncodeCStr
(
&
encoder
,
pReq
->
ast
)
<
0
)
return
-
1
;
...
...
source/dnode/mgmt/mgmt_mnode/src/mmHandle.c
浏览文件 @
6a7f526f
...
...
@@ -211,6 +211,7 @@ void mmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_SUBSCRIBE
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_MQ_COMMIT_OFFSET
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_GET_SUB_EP
,
mmProcessReadMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_VG_CHANGE_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_CREATE_STREAM
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_DEPLOY_RSP
,
mmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_MND_GET_DB_CFG
,
mmProcessReadMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
6a7f526f
...
...
@@ -252,7 +252,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_QUERY
,
vmProcessQueryMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_CONNECT
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_DISCONNECT
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_SET_CUR
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
//
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_RES_READY
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_TASKS_STATUS
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CANCEL_TASK
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
...
...
@@ -265,10 +265,11 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CREATE_SMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CANCEL_SMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_DROP_SMA
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_SET_CONN
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_REB
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_CANCEL_CONN
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_SET_CUR
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
//dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
//dmSetMsgHandle(pWrapper, TDMT_VND_MQ_REB, vmProcessWriteMsg, DEFAULT_HANDLE);
//dmSetMsgHandle(pWrapper, TDMT_VND_MQ_CANCEL_CONN, vmProcessWriteMsg, DEFAULT_HANDLE);
//dmSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CUR, vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_MQ_VG_CHANGE
,
(
NodeMsgFp
)
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_CONSUME
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_TASK_DEPLOY
,
vmProcessWriteMsg
,
DEFAULT_HANDLE
);
dmSetMsgHandle
(
pWrapper
,
TDMT_VND_QUERY_HEARTBEAT
,
vmProcessFetchMsg
,
DEFAULT_HANDLE
);
...
...
source/dnode/mnode/impl/inc/mndConsumer.h
浏览文件 @
6a7f526f
...
...
@@ -22,27 +22,30 @@
extern
"C"
{
#endif
enum
{
MQ_CONSUMER_STATUS__INIT
=
1
,
MQ_CONSUMER_STATUS__IDLE
,
MQ_CONSUMER_STATUS__ACTIVE
,
// MQ_CONSUMER_STATUS__INIT = 1,
MQ_CONSUMER_STATUS__MODIFY
=
1
,
MQ_CONSUMER_STATUS__MODIFY_IN_REB
,
// MQ_CONSUMER_STATUS__IDLE,
MQ_CONSUMER_STATUS__READY
,
MQ_CONSUMER_STATUS__LOST
,
MQ_CONSUMER_STATUS__MODIFY
MQ_CONSUMER_STATUS__LOST_IN_REB
,
MQ_CONSUMER_STATUS__LOST_REBD
,
};
int32_t
mndInitConsumer
(
SMnode
*
pMnode
);
void
mndCleanupConsumer
(
SMnode
*
pMnode
);
SMqConsumerObj
*
mndAcquireConsumer
(
SMnode
*
pMnode
,
int64_t
consumerId
);
void
mndReleaseConsumer
(
SMnode
*
pMnode
,
SMqConsumerObj
*
pConsumer
);
SMqConsumerObj
*
mndCreateConsumer
(
int64_t
consumerId
,
const
char
*
cgroup
);
SMqConsumerObj
*
mndCreateConsumer
(
int64_t
consumerId
,
const
char
*
cgroup
);
SSdbRaw
*
mndConsumerActionEncode
(
SMqConsumerObj
*
pConsumer
);
SSdbRow
*
mndConsumerActionDecode
(
SSdbRaw
*
pRaw
);
int32_t
mndSetConsumerCommitLogs
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqConsumerObj
*
pConsumer
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
6a7f526f
...
...
@@ -88,6 +88,7 @@ typedef enum {
TRN_TYPE_CREATE_STREAM
=
1019
,
TRN_TYPE_DROP_STREAM
=
1020
,
TRN_TYPE_ALTER_STREAM
=
1021
,
TRN_TYPE_CONSUMER_LOST
=
1022
,
TRN_TYPE_BASIC_SCOPE_END
,
TRN_TYPE_GLOBAL_SCOPE
=
2000
,
TRN_TYPE_CREATE_DNODE
=
2001
,
...
...
@@ -511,6 +512,7 @@ static FORCE_INLINE void* tDecodeSMqOffsetObj(void* buf, SMqOffsetObj* pOffset)
return
buf
;
}
#if 0
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t status;
...
...
@@ -641,6 +643,7 @@ static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
pSub->unassignedVg = NULL;
}
}
#endif
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
...
...
@@ -659,6 +662,7 @@ typedef struct {
SSchemaWrapper
schema
;
}
SMqTopicObj
;
#if 0
typedef struct {
int64_t consumerId;
int64_t connId;
...
...
@@ -713,7 +717,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
buf = taosDecodeFixedI32(buf, &sz);
pConsumer
->
currentTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerObj
));
pConsumer->currentTopics = taosArrayInit(sz, sizeof(
void*
));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
...
...
@@ -721,7 +725,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
}
buf = taosDecodeFixedI32(buf, &sz);
pConsumer
->
recentRemovedTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqConsumerObj
));
pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(
void*
));
for (int32_t i = 0; i < sz; i++) {
char* topic;
buf = taosDecodeString(buf, &topic);
...
...
@@ -729,6 +733,132 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
}
return buf;
}
#endif
enum
{
CONSUMER_UPDATE__TOUCH
=
1
,
CONSUMER_UPDATE__ADD
,
CONSUMER_UPDATE__REMOVE
,
CONSUMER_UPDATE__LOST
,
CONSUMER_UPDATE__MODIFY
,
};
typedef
struct
{
int64_t
consumerId
;
char
cgroup
[
TSDB_CGROUP_LEN
];
int8_t
updateType
;
// used only for update
int32_t
epoch
;
int32_t
status
;
// hbStatus is not applicable to serialization
int32_t
hbStatus
;
// lock is used for topics update
SRWLatch
lock
;
SArray
*
currentTopics
;
// SArray<char*>
#if 0
SArray* waitingRebTopics; // SArray<char*>
#endif
SArray
*
rebNewTopics
;
// SArray<char*>
SArray
*
rebRemovedTopics
;
// SArray<char*>
}
SMqConsumerObj
;
SMqConsumerObj
*
tNewSMqConsumerObj
(
int64_t
consumerId
,
char
cgroup
[
TSDB_CGROUP_LEN
]);
void
tDeleteSMqConsumerObj
(
SMqConsumerObj
*
pConsumer
);
int32_t
tEncodeSMqConsumerObj
(
void
**
buf
,
const
SMqConsumerObj
*
pConsumer
);
void
*
tDecodeSMqConsumerObj
(
const
void
*
buf
,
SMqConsumerObj
*
pConsumer
);
typedef
struct
{
int32_t
vgId
;
char
*
qmsg
;
// char topic[TSDB_TOPIC_FNAME_LEN];
SEpSet
epSet
;
}
SMqVgEp
;
SMqVgEp
*
tCloneSMqVgEp
(
const
SMqVgEp
*
pVgEp
);
void
tDeleteSMqVgEp
(
SMqVgEp
*
pVgEp
);
int32_t
tEncodeSMqVgEp
(
void
**
buf
,
const
SMqVgEp
*
pVgEp
);
void
*
tDecodeSMqVgEp
(
const
void
*
buf
,
SMqVgEp
*
pVgEp
);
typedef
struct
{
int64_t
consumerId
;
// -1 for unassigned
SArray
*
vgs
;
// SArray<SMqVgEp*>
}
SMqConsumerEpInSub
;
SMqConsumerEpInSub
*
tCloneSMqConsumerEpInSub
(
const
SMqConsumerEpInSub
*
pEpInSub
);
void
tDeleteSMqConsumerEpInSub
(
SMqConsumerEpInSub
*
pEpInSub
);
int32_t
tEncodeSMqConsumerEpInSub
(
void
**
buf
,
const
SMqConsumerEpInSub
*
pEpInSub
);
void
*
tDecodeSMqConsumerEpInSub
(
const
void
*
buf
,
SMqConsumerEpInSub
*
pEpInSub
);
typedef
struct
{
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
];
SRWLatch
lock
;
int32_t
vgNum
;
SHashObj
*
consumerHash
;
// consumerId -> SMqConsumerEpInSub
}
SMqSubscribeObj
;
SMqSubscribeObj
*
tNewSubscribeObj
(
const
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
]);
SMqSubscribeObj
*
tCloneSubscribeObj
(
const
SMqSubscribeObj
*
pSub
);
void
tDeleteSubscribeObj
(
SMqSubscribeObj
*
pSub
);
int32_t
tEncodeSubscribeObj
(
void
**
buf
,
const
SMqSubscribeObj
*
pSub
);
void
*
tDecodeSubscribeObj
(
const
void
*
buf
,
SMqSubscribeObj
*
pSub
);
typedef
struct
{
int32_t
epoch
;
SArray
*
consumers
;
// SArray<SMqConsumerEpInSub*>
}
SMqSubActionLogEntry
;
SMqSubActionLogEntry
*
tCloneSMqSubActionLogEntry
(
SMqSubActionLogEntry
*
pEntry
);
void
tDeleteSMqSubActionLogEntry
(
SMqSubActionLogEntry
*
pEntry
);
int32_t
tEncodeSMqSubActionLogEntry
(
void
**
buf
,
const
SMqSubActionLogEntry
*
pEntry
);
void
*
tDecodeSMqSubActionLogEntry
(
const
void
*
buf
,
SMqSubActionLogEntry
*
pEntry
);
typedef
struct
{
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
];
SArray
*
logs
;
// SArray<SMqSubActionLogEntry*>
}
SMqSubActionLogObj
;
SMqSubActionLogObj
*
tCloneSMqSubActionLogObj
(
SMqSubActionLogObj
*
pLog
);
void
tDeleteSMqSubActionLogObj
(
SMqSubActionLogObj
*
pLog
);
int32_t
tEncodeSMqSubActionLogObj
(
void
**
buf
,
const
SMqSubActionLogObj
*
pLog
);
void
*
tDecodeSMqSubActionLogObj
(
const
void
*
buf
,
SMqSubActionLogObj
*
pLog
);
typedef
struct
{
int64_t
consumerId
;
char
cgroup
[
TSDB_CGROUP_LEN
];
SRWLatch
lock
;
SArray
*
vgs
;
// SArray<SMqVgEp*>
}
SMqConsumerEpObj
;
SMqConsumerEpObj
*
tCloneSMqConsumerEpObj
(
const
SMqConsumerEpObj
*
pConsumerEp
);
void
tDeleteSMqConsumerEpObj
(
SMqConsumerEpObj
*
pConsumerEp
);
int32_t
tEncodeSMqConsumerEpObj
(
void
**
buf
,
const
SMqConsumerEpObj
*
pConsumerEp
);
void
*
tDecodeSMqConsumerEpObj
(
const
void
*
buf
,
SMqConsumerEpObj
*
pConsumerEp
);
typedef
struct
{
const
SMqSubscribeObj
*
pOldSub
;
const
SMqTopicObj
*
pTopic
;
const
SMqRebSubscribe
*
pRebInfo
;
}
SMqRebInputObj
;
typedef
struct
{
int64_t
oldConsumerId
;
int64_t
newConsumerId
;
SMqVgEp
*
pVgEp
;
}
SMqRebOutputVg
;
#if 0
typedef struct {
int64_t consumerId;
} SMqRebOutputConsumer;
#endif
typedef
struct
{
SArray
*
rebVgs
;
// SArray<SMqRebOutputVg>
SArray
*
newConsumers
;
// SArray<int64_t>
SArray
*
removedConsumers
;
// SArray<int64_t>
SArray
*
touchedConsumers
;
// SArray<int64_t>
SMqSubscribeObj
*
pSub
;
SMqSubActionLogEntry
*
pLogEntry
;
}
SMqRebOutputObj
;
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
...
...
source/dnode/mnode/impl/inc/mndSubscribe.h
浏览文件 @
6a7f526f
...
...
@@ -26,9 +26,11 @@ int32_t mndInitSubscribe(SMnode *pMnode);
void
mndCleanupSubscribe
(
SMnode
*
pMnode
);
SMqSubscribeObj
*
mndAcquireSubscribe
(
SMnode
*
pMnode
,
const
char
*
CGroup
,
const
char
*
topicName
);
SMqSubscribeObj
*
mndAcquireSubscribeByKey
(
SMnode
*
pMnode
,
const
char
*
key
);
SMqSubscribeObj
*
mndAcquireSubscribeByKey
(
SMnode
*
pMnode
,
const
char
*
key
);
void
mndReleaseSubscribe
(
SMnode
*
pMnode
,
SMqSubscribeObj
*
pSub
);
int32_t
mndMakeSubscribeKey
(
char
*
key
,
const
char
*
cgroup
,
const
char
*
topicName
);
#ifdef __cplusplus
}
#endif
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
6a7f526f
此差异已折叠。
点击以展开。
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
6a7f526f
...
...
@@ -14,6 +14,403 @@
*/
#include "mndDef.h"
#include "mndConsumer.h"
SMqConsumerObj
*
tNewSMqConsumerObj
(
int64_t
consumerId
,
char
cgroup
[
TSDB_CGROUP_LEN
])
{
SMqConsumerObj
*
pConsumer
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqConsumerObj
));
if
(
pConsumer
==
NULL
)
{
return
NULL
;
}
pConsumer
->
consumerId
=
consumerId
;
memcpy
(
pConsumer
->
cgroup
,
cgroup
,
TSDB_CGROUP_LEN
);
pConsumer
->
epoch
=
0
;
pConsumer
->
status
=
MQ_CONSUMER_STATUS__MODIFY
;
pConsumer
->
hbStatus
=
0
;
taosInitRWLatch
(
&
pConsumer
->
lock
);
pConsumer
->
currentTopics
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
#if 0
pConsumer->waitingRebTopics = NULL;
#endif
pConsumer
->
rebNewTopics
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
pConsumer
->
rebRemovedTopics
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
if
(
pConsumer
->
currentTopics
==
NULL
||
pConsumer
->
rebNewTopics
==
NULL
||
pConsumer
->
rebRemovedTopics
==
NULL
)
{
taosArrayDestroy
(
pConsumer
->
currentTopics
);
taosArrayDestroy
(
pConsumer
->
rebNewTopics
);
taosArrayDestroy
(
pConsumer
->
rebRemovedTopics
);
taosMemoryFree
(
pConsumer
);
return
NULL
;
}
return
pConsumer
;
}
void
tDeleteSMqConsumerObj
(
SMqConsumerObj
*
pConsumer
)
{
if
(
pConsumer
->
currentTopics
)
{
taosArrayDestroyP
(
pConsumer
->
currentTopics
,
(
FDelete
)
taosMemoryFree
);
}
#if 0
if (pConsumer->waitingRebTopics) {
taosArrayDestroyP(pConsumer->waitingRebTopics, taosMemoryFree);
}
#endif
if
(
pConsumer
->
rebNewTopics
)
{
taosArrayDestroyP
(
pConsumer
->
rebNewTopics
,
(
FDelete
)
taosMemoryFree
);
}
if
(
pConsumer
->
rebRemovedTopics
)
{
taosArrayDestroyP
(
pConsumer
->
rebRemovedTopics
,
(
FDelete
)
taosMemoryFree
);
}
}
int32_t
tEncodeSMqConsumerObj
(
void
**
buf
,
const
SMqConsumerObj
*
pConsumer
)
{
int32_t
tlen
=
0
;
int32_t
sz
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pConsumer
->
updateType
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumer
->
epoch
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumer
->
status
);
// current topics
if
(
pConsumer
->
currentTopics
)
{
sz
=
taosArrayGetSize
(
pConsumer
->
currentTopics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topic
=
taosArrayGetP
(
pConsumer
->
currentTopics
,
i
);
tlen
+=
taosEncodeString
(
buf
,
topic
);
}
}
else
{
tlen
+=
taosEncodeFixedI32
(
buf
,
0
);
}
#if 0
// waiting reb topics
if (pConsumer->waitingRebTopics) {
sz = taosArrayGetSize(pConsumer->waitingRebTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
char *topic = taosArrayGetP(pConsumer->waitingRebTopics, i);
tlen += taosEncodeString(buf, topic);
}
} else {
tlen += taosEncodeFixedI32(buf, 0);
}
#endif
// reb new topics
if
(
pConsumer
->
rebNewTopics
)
{
sz
=
taosArrayGetSize
(
pConsumer
->
rebNewTopics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topic
=
taosArrayGetP
(
pConsumer
->
rebNewTopics
,
i
);
tlen
+=
taosEncodeString
(
buf
,
topic
);
}
}
else
{
tlen
+=
taosEncodeFixedI32
(
buf
,
0
);
}
// reb removed topics
if
(
pConsumer
->
rebRemovedTopics
)
{
sz
=
taosArrayGetSize
(
pConsumer
->
rebRemovedTopics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topic
=
taosArrayGetP
(
pConsumer
->
rebRemovedTopics
,
i
);
tlen
+=
taosEncodeString
(
buf
,
topic
);
}
}
else
{
tlen
+=
taosEncodeFixedI32
(
buf
,
0
);
}
return
tlen
;
}
void
*
tDecodeSMqConsumerObj
(
const
void
*
buf
,
SMqConsumerObj
*
pConsumer
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pConsumer
->
updateType
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumer
->
status
);
// current topics
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
currentTopics
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topic
;
buf
=
taosDecodeString
(
buf
,
&
topic
);
taosArrayPush
(
pConsumer
->
currentTopics
,
&
topic
);
}
#if 0
// waiting reb topics
buf = taosDecodeFixedI32(buf, &sz);
pConsumer->waitingRebTopics = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
char *topic;
buf = taosDecodeString(buf, &topic);
taosArrayPush(pConsumer->waitingRebTopics, &topic);
}
#endif
// reb new topics
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
rebNewTopics
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topic
;
buf
=
taosDecodeString
(
buf
,
&
topic
);
taosArrayPush
(
pConsumer
->
rebNewTopics
,
&
topic
);
}
// reb removed topics
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
rebRemovedTopics
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topic
;
buf
=
taosDecodeString
(
buf
,
&
topic
);
taosArrayPush
(
pConsumer
->
rebRemovedTopics
,
&
topic
);
}
return
(
void
*
)
buf
;
}
SMqVgEp
*
tCloneSMqVgEp
(
const
SMqVgEp
*
pVgEp
)
{
SMqVgEp
*
pVgEpNew
=
taosMemoryMalloc
(
sizeof
(
SMqVgEp
));
if
(
pVgEpNew
==
NULL
)
return
NULL
;
pVgEpNew
->
vgId
=
pVgEp
->
vgId
;
pVgEpNew
->
qmsg
=
strdup
(
pVgEp
->
qmsg
);
/*memcpy(pVgEpNew->topic, pVgEp->topic, TSDB_TOPIC_FNAME_LEN);*/
pVgEpNew
->
epSet
=
pVgEp
->
epSet
;
return
pVgEpNew
;
}
void
tDeleteSMqVgEp
(
SMqVgEp
*
pVgEp
)
{
taosMemoryFree
(
pVgEp
->
qmsg
);
}
int32_t
tEncodeSMqVgEp
(
void
**
buf
,
const
SMqVgEp
*
pVgEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pVgEp
->
vgId
);
tlen
+=
taosEncodeString
(
buf
,
pVgEp
->
qmsg
);
/*tlen += taosEncodeString(buf, pVgEp->topic);*/
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pVgEp
->
epSet
);
return
tlen
;
}
void
*
tDecodeSMqVgEp
(
const
void
*
buf
,
SMqVgEp
*
pVgEp
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pVgEp
->
vgId
);
buf
=
taosDecodeString
(
buf
,
&
pVgEp
->
qmsg
);
/*buf = taosDecodeStringTo(buf, pVgEp->topic);*/
buf
=
taosDecodeSEpSet
(
buf
,
&
pVgEp
->
epSet
);
return
(
void
*
)
buf
;
}
SMqConsumerEpObj
*
tCloneSMqConsumerEpObj
(
const
SMqConsumerEpObj
*
pConsumerEp
)
{
SMqConsumerEpObj
*
pConsumerEpNew
=
taosMemoryMalloc
(
sizeof
(
SMqConsumerEpObj
));
if
(
pConsumerEpNew
==
NULL
)
return
NULL
;
pConsumerEpNew
->
consumerId
=
pConsumerEp
->
consumerId
;
memcpy
(
pConsumerEpNew
->
cgroup
,
pConsumerEp
->
cgroup
,
TSDB_CGROUP_LEN
);
taosInitRWLatch
(
&
pConsumerEpNew
->
lock
);
pConsumerEpNew
->
vgs
=
taosArrayDeepCopy
(
pConsumerEpNew
->
vgs
,
(
FCopy
)
tCloneSMqVgEp
);
return
pConsumerEpNew
;
}
void
tDeleteSMqConsumerEpObj
(
SMqConsumerEpObj
*
pConsumerEp
)
{
taosArrayDestroyEx
(
pConsumerEp
->
vgs
,
(
FDelete
)
tDeleteSMqVgEp
);
}
int32_t
tEncodeSMqConsumerEpObj
(
void
**
buf
,
const
SMqConsumerEpObj
*
pConsumerEp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumerEp
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pConsumerEp
->
cgroup
);
tlen
+=
taosEncodeArray
(
buf
,
pConsumerEp
->
vgs
,
(
FEncode
)
tEncodeSMqVgEp
);
return
tlen
;
}
void
*
tDecodeSMqConsumerEpObj
(
const
void
*
buf
,
SMqConsumerEpObj
*
pConsumerEp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumerEp
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumerEp
->
cgroup
);
buf
=
taosDecodeArray
(
buf
,
&
pConsumerEp
->
vgs
,
(
FDecode
)
tDecodeSMqVgEp
,
sizeof
(
SMqSubVgEp
));
return
(
void
*
)
buf
;
}
SMqConsumerEpInSub
*
tCloneSMqConsumerEpInSub
(
const
SMqConsumerEpInSub
*
pEpInSub
)
{
SMqConsumerEpInSub
*
pEpInSubNew
=
taosMemoryMalloc
(
sizeof
(
SMqConsumerEpInSub
));
if
(
pEpInSubNew
==
NULL
)
return
NULL
;
pEpInSubNew
->
consumerId
=
pEpInSub
->
consumerId
;
pEpInSubNew
->
vgs
=
taosArrayDeepCopy
(
pEpInSub
->
vgs
,
(
FCopy
)
tCloneSMqVgEp
);
return
pEpInSubNew
;
}
void
tDeleteSMqConsumerEpInSub
(
SMqConsumerEpInSub
*
pEpInSub
)
{
taosArrayDestroyEx
(
pEpInSub
->
vgs
,
(
FDelete
)
tDeleteSMqVgEp
);
}
int32_t
tEncodeSMqConsumerEpInSub
(
void
**
buf
,
const
SMqConsumerEpInSub
*
pEpInSub
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pEpInSub
->
consumerId
);
int32_t
sz
=
taosArrayGetSize
(
pEpInSub
->
vgs
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqVgEp
*
pVgEp
=
taosArrayGetP
(
pEpInSub
->
vgs
,
i
);
tlen
+=
tEncodeSMqVgEp
(
buf
,
pVgEp
);
}
/*tlen += taosEncodeArray(buf, pEpInSub->vgs, (FEncode)tEncodeSMqVgEp);*/
return
tlen
;
}
void
*
tDecodeSMqConsumerEpInSub
(
const
void
*
buf
,
SMqConsumerEpInSub
*
pEpInSub
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pEpInSub
->
consumerId
);
/*buf = taosDecodeArray(buf, &pEpInSub->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));*/
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pEpInSub
->
vgs
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqVgEp
*
pVgEp
=
taosMemoryMalloc
(
sizeof
(
SMqVgEp
));
buf
=
tDecodeSMqVgEp
(
buf
,
pVgEp
);
taosArrayPush
(
pEpInSub
->
vgs
,
&
pVgEp
);
}
return
(
void
*
)
buf
;
}
SMqSubscribeObj
*
tNewSubscribeObj
(
const
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
])
{
SMqSubscribeObj
*
pSubNew
=
taosMemoryMalloc
(
sizeof
(
SMqSubscribeObj
));
if
(
pSubNew
==
NULL
)
return
NULL
;
memcpy
(
pSubNew
->
key
,
key
,
TSDB_SUBSCRIBE_KEY_LEN
);
taosInitRWLatch
(
&
pSubNew
->
lock
);
pSubNew
->
vgNum
=
-
1
;
pSubNew
->
consumerHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
// TODO set free fp
SMqConsumerEpInSub
epInSub
=
{
.
consumerId
=
-
1
,
.
vgs
=
taosArrayInit
(
0
,
sizeof
(
void
*
)),
};
int64_t
unexistKey
=
-
1
;
taosHashPut
(
pSubNew
->
consumerHash
,
&
unexistKey
,
sizeof
(
int64_t
),
&
epInSub
,
sizeof
(
SMqConsumerEpInSub
));
return
pSubNew
;
}
SMqSubscribeObj
*
tCloneSubscribeObj
(
const
SMqSubscribeObj
*
pSub
)
{
SMqSubscribeObj
*
pSubNew
=
taosMemoryMalloc
(
sizeof
(
SMqSubscribeObj
));
if
(
pSubNew
==
NULL
)
return
NULL
;
memcpy
(
pSubNew
->
key
,
pSub
->
key
,
TSDB_SUBSCRIBE_KEY_LEN
);
taosInitRWLatch
(
&
pSubNew
->
lock
);
pSubNew
->
vgNum
=
pSub
->
vgNum
;
/*pSubNew->consumerEps = taosArrayDeepCopy(pSub->consumerEps, (FCopy)tCloneSMqConsumerEpInSub);*/
pSubNew
->
consumerHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
/*taosHashSetFreeFp(pSubNew->consumerHash, taosArrayDestroy);*/
void
*
pIter
=
NULL
;
SMqConsumerEpInSub
*
pEpInSub
=
NULL
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pSub
->
consumerHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
pEpInSub
=
(
SMqConsumerEpInSub
*
)
pIter
;
SMqConsumerEpInSub
newEp
=
{
.
consumerId
=
pEpInSub
->
consumerId
,
.
vgs
=
taosArrayDeepCopy
(
pEpInSub
->
vgs
,
(
FCopy
)
tCloneSMqVgEp
),
};
taosHashPut
(
pSubNew
->
consumerHash
,
&
newEp
.
consumerId
,
sizeof
(
int64_t
),
&
newEp
,
sizeof
(
SMqConsumerEpInSub
));
}
return
pSubNew
;
}
void
tDeleteSubscribeObj
(
SMqSubscribeObj
*
pSub
)
{
/*taosArrayDestroyEx(pSub->consumerEps, (FDelete)tDeleteSMqConsumerEpInSub);*/
taosHashCleanup
(
pSub
->
consumerHash
);
}
int32_t
tEncodeSubscribeObj
(
void
**
buf
,
const
SMqSubscribeObj
*
pSub
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pSub
->
key
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSub
->
vgNum
);
void
*
pIter
=
NULL
;
int32_t
sz
=
taosHashGetSize
(
pSub
->
consumerHash
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
int32_t
cnt
=
0
;
while
(
1
)
{
pIter
=
taosHashIterate
(
pSub
->
consumerHash
,
pIter
);
if
(
pIter
==
NULL
)
break
;
SMqConsumerEpInSub
*
pEpInSub
=
(
SMqConsumerEpInSub
*
)
pIter
;
tlen
+=
tEncodeSMqConsumerEpInSub
(
buf
,
pEpInSub
);
cnt
++
;
}
ASSERT
(
cnt
==
sz
);
/*tlen += taosEncodeArray(buf, pSub->consumerEps, (FEncode)tEncodeSMqConsumerEpInSub);*/
return
tlen
;
}
void
*
tDecodeSubscribeObj
(
const
void
*
buf
,
SMqSubscribeObj
*
pSub
)
{
//
buf
=
taosDecodeStringTo
(
buf
,
pSub
->
key
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSub
->
vgNum
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pSub
->
consumerHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_NO_LOCK
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
/*SMqConsumerEpInSub* pEpInSub = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));*/
SMqConsumerEpInSub
epInSub
=
{
0
};
buf
=
tDecodeSMqConsumerEpInSub
(
buf
,
&
epInSub
);
taosHashPut
(
pSub
->
consumerHash
,
&
epInSub
.
consumerId
,
sizeof
(
int64_t
),
&
epInSub
,
sizeof
(
SMqConsumerEpInSub
));
}
/*buf = taosDecodeArray(buf, &pSub->consumerEps, (FDecode)tDecodeSMqConsumerEpInSub, sizeof(SMqConsumerEpInSub));*/
return
(
void
*
)
buf
;
}
SMqSubActionLogEntry
*
tCloneSMqSubActionLogEntry
(
SMqSubActionLogEntry
*
pEntry
)
{
SMqSubActionLogEntry
*
pEntryNew
=
taosMemoryMalloc
(
sizeof
(
SMqSubActionLogEntry
));
if
(
pEntryNew
==
NULL
)
return
NULL
;
pEntryNew
->
epoch
=
pEntry
->
epoch
;
pEntryNew
->
consumers
=
taosArrayDeepCopy
(
pEntry
->
consumers
,
(
FCopy
)
tCloneSMqConsumerEpInSub
);
return
pEntryNew
;
}
void
tDeleteSMqSubActionLogEntry
(
SMqSubActionLogEntry
*
pEntry
)
{
taosArrayDestroyEx
(
pEntry
->
consumers
,
(
FDelete
)
tDeleteSMqConsumerEpInSub
);
}
int32_t
tEncodeSMqSubActionLogEntry
(
void
**
buf
,
const
SMqSubActionLogEntry
*
pEntry
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI32
(
buf
,
pEntry
->
epoch
);
tlen
+=
taosEncodeArray
(
buf
,
pEntry
->
consumers
,
(
FEncode
)
tEncodeSMqSubActionLogEntry
);
return
tlen
;
}
void
*
tDecodeSMqSubActionLogEntry
(
const
void
*
buf
,
SMqSubActionLogEntry
*
pEntry
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pEntry
->
epoch
);
buf
=
taosDecodeArray
(
buf
,
&
pEntry
->
consumers
,
(
FDecode
)
tDecodeSMqSubActionLogEntry
,
sizeof
(
SMqSubActionLogEntry
));
return
(
void
*
)
buf
;
}
SMqSubActionLogObj
*
tCloneSMqSubActionLogObj
(
SMqSubActionLogObj
*
pLog
)
{
SMqSubActionLogObj
*
pLogNew
=
taosMemoryMalloc
(
sizeof
(
SMqSubActionLogObj
));
if
(
pLogNew
==
NULL
)
return
pLogNew
;
memcpy
(
pLogNew
->
key
,
pLog
->
key
,
TSDB_SUBSCRIBE_KEY_LEN
);
pLogNew
->
logs
=
taosArrayDeepCopy
(
pLog
->
logs
,
(
FCopy
)
tCloneSMqConsumerEpInSub
);
return
pLogNew
;
}
void
tDeleteSMqSubActionLogObj
(
SMqSubActionLogObj
*
pLog
)
{
taosArrayDestroyEx
(
pLog
->
logs
,
(
FDelete
)
tDeleteSMqConsumerEpInSub
);
}
int32_t
tEncodeSMqSubActionLogObj
(
void
**
buf
,
const
SMqSubActionLogObj
*
pLog
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeString
(
buf
,
pLog
->
key
);
tlen
+=
taosEncodeArray
(
buf
,
pLog
->
logs
,
(
FEncode
)
tEncodeSMqSubActionLogEntry
);
return
tlen
;
}
void
*
tDecodeSMqSubActionLogObj
(
const
void
*
buf
,
SMqSubActionLogObj
*
pLog
)
{
buf
=
taosDecodeStringTo
(
buf
,
pLog
->
key
);
buf
=
taosDecodeArray
(
buf
,
&
pLog
->
logs
,
(
FDecode
)
tDecodeSMqSubActionLogEntry
,
sizeof
(
SMqSubActionLogEntry
));
return
(
void
*
)
buf
;
}
int32_t
tEncodeSStreamObj
(
SCoder
*
pEncoder
,
const
SStreamObj
*
pObj
)
{
int32_t
sz
=
0
;
...
...
source/dnode/mnode/impl/src/mndScheduler.c
浏览文件 @
6a7f526f
...
...
@@ -434,7 +434,9 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
return
-
1
;
}
ASSERT
(
pSub
->
vgNum
==
0
);
ASSERT
(
pSub
->
vgNum
==
-
1
);
pSub
->
vgNum
=
0
;
int32_t
levelNum
=
LIST_LENGTH
(
pPlan
->
pSubplans
);
if
(
levelNum
!=
1
)
{
...
...
@@ -453,6 +455,12 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
}
SSubplan
*
plan
=
nodesListGetNode
(
inner
->
pNodeList
,
0
);
int64_t
unexistKey
=
-
1
;
SMqConsumerEpInSub
*
pEpInSub
=
taosHashGet
(
pSub
->
consumerHash
,
&
unexistKey
,
sizeof
(
int64_t
));
ASSERT
(
pEpInSub
);
ASSERT
(
taosHashGetSize
(
pSub
->
consumerHash
)
==
1
);
void
*
pIter
=
NULL
;
while
(
1
)
{
pIter
=
sdbFetch
(
pSdb
,
SDB_VGROUP
,
pIter
,
(
void
**
)
&
pVgroup
);
...
...
@@ -466,24 +474,41 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
plan
->
execNode
.
nodeId
=
pVgroup
->
vgId
;
plan
->
execNode
.
epSet
=
mndGetVgroupEpset
(
pMnode
,
pVgroup
);
SMqVgEp
*
pVgEp
=
taosMemoryMalloc
(
sizeof
(
SMqVgEp
));
pVgEp
->
epSet
=
plan
->
execNode
.
epSet
;
pVgEp
->
vgId
=
plan
->
execNode
.
nodeId
;
#if 0
SMqConsumerEp consumerEp = {0};
consumerEp.status = 0;
consumerEp.consumerId = -1;
consumerEp.epSet = plan->execNode.epSet;
consumerEp.vgId = plan->execNode.nodeId;
mDebug
(
"init subscribption %s, assign vg: %d"
,
pSub
->
key
,
consumerEp
.
vgId
);
#endif
mDebug
(
"init subscribption %s, assign vg: %d"
,
pSub
->
key
,
pVgEp
->
vgId
);
int32_t
msgLen
;
if
(
qSubPlanToString
(
plan
,
&
consumerEp
.
qmsg
,
&
msgLen
)
<
0
)
{
if
(
qSubPlanToString
(
plan
,
&
pVgEp
->
qmsg
,
&
msgLen
)
<
0
)
{
sdbRelease
(
pSdb
,
pVgroup
);
qDestroyQueryPlan
(
pPlan
);
terrno
=
TSDB_CODE_QRY_INVALID_INPUT
;
return
-
1
;
}
taosArrayPush
(
pSub
->
unassignedVg
,
&
consumerEp
);
taosArrayPush
(
pEpInSub
->
vgs
,
&
pVgEp
);
ASSERT
(
taosHashGetSize
(
pSub
->
consumerHash
)
==
1
);
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
}
ASSERT
(
pEpInSub
->
vgs
->
size
>
0
);
pEpInSub
=
taosHashGet
(
pSub
->
consumerHash
,
&
unexistKey
,
sizeof
(
int64_t
));
ASSERT
(
pEpInSub
->
vgs
->
size
>
0
);
ASSERT
(
taosHashGetSize
(
pSub
->
consumerHash
)
==
1
);
qDestroyQueryPlan
(
pPlan
);
return
0
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
6a7f526f
此差异已折叠。
点击以展开。
source/dnode/mnode/impl/src/mnode.c
浏览文件 @
6a7f526f
...
...
@@ -22,12 +22,14 @@
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
#include "mndGrant.h"
#include "mndInfoSchema.h"
#include "mndPerfSchema.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndPerfSchema.h"
#include "mndProfile.h"
#include "mndQnode.h"
#include "mndQuery.h"
#include "mndShow.h"
#include "mndSma.h"
#include "mndSnode.h"
...
...
@@ -40,8 +42,6 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "mndQuery.h"
#include "mndGrant.h"
#define MQ_TIMER_MS 3000
#define TRNAS_TIMER_MS 6000
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
6a7f526f
...
...
@@ -16,6 +16,13 @@
#ifndef _TD_VNODE_TQ_H_
#define _TD_VNODE_TQ_H_
#include "executor.h"
#include "os.h"
#include "thash.h"
#include "tmsg.h"
#include "ttimer.h"
#include "wal.h"
#ifdef __cplusplus
extern
"C"
{
#endif
...
...
@@ -30,12 +37,6 @@ extern "C" {
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
enum
{
TQ_STREAM_TOKEN__DATA
=
1
,
TQ_STREAM_TOKEN__WATERMARK
,
TQ_STREAM_TOKEN__CHECKPOINT
,
};
#define TQ_BUFFER_SIZE 4
#define TQ_BUCKET_MASK 0xFF
...
...
@@ -151,22 +152,27 @@ typedef struct {
}
STqMetaStore
;
typedef
struct
{
SMemAllocatorFactory
*
pAllocatorFactory
;
SMemAllocator
*
pAllocator
;
}
STqMemRef
;
char
subKey
[
TSDB_SUBSCRIBE_KEY_LEN
];
int64_t
consumerId
;
int32_t
epoch
;
char
*
qmsg
;
// SRWLatch lock;
SWalReadHandle
*
pReadHandle
;
// number should be identical to fetch thread num
qTaskInfo_t
task
[
4
];
}
STqExec
;
struct
STQ
{
// the collection of groups
// the handle of meta kvstore
bool
writeTrigger
;
char
*
path
;
STqMemRef
tqMemRef
;
STqMetaStore
*
tqMeta
;
// STqPushMgr* tqPushMgr;
SHashObj
*
pStreamTasks
;
SVnode
*
pVnode
;
SWal
*
pWal
;
SMeta
*
pVnodeMeta
;
SHashObj
*
tqMetaNew
;
// subKey -> tqExec
SHashObj
*
pStreamTasks
;
SVnode
*
pVnode
;
SWal
*
pWal
;
SMeta
*
pVnodeMeta
;
};
typedef
struct
{
...
...
@@ -230,10 +236,6 @@ typedef struct {
// TODO sync function
}
STqStreamPusher
;
typedef
struct
{
int8_t
type
;
// mq or stream
}
STqPusher
;
typedef
struct
{
SHashObj
*
pHash
;
// <id, STqPush*>
}
STqPushMgr
;
...
...
@@ -257,9 +259,10 @@ int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
int
tqCommit
(
STQ
*
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessCancelConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessVgChangeReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
// int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
// int32_t tqProcessRebReq(STQ* pTq, char* msg);
// int32_t tqProcessCancelConnReq(STQ* pTq, char* msg);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
);
...
...
@@ -314,4 +317,4 @@ STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet
}
#endif
#endif
/*_TD_VNODE_TQ_H_*/
\ No newline at end of file
#endif
/*_TD_VNODE_TQ_H_*/
source/dnode/vnode/src/tq/tq.c
浏览文件 @
6a7f526f
...
...
@@ -29,31 +29,14 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe
pTq
->
pVnode
=
pVnode
;
pTq
->
pWal
=
pWal
;
pTq
->
pVnodeMeta
=
pVnodeMeta
;
#if 0
pTq->tqMemRef.pAllocatorFactory = allocFac;
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
if (pTq->tqMemRef.pAllocator == NULL) {
// TODO: error code of buffer pool
}
#endif
pTq
->
tqMeta
=
tqStoreOpen
(
pTq
,
path
,
(
FTqSerialize
)
tqSerializeConsumer
,
(
FTqDeserialize
)
tqDeserializeConsumer
,
(
FTqDelete
)
taosMemoryFree
,
0
);
if
(
pTq
->
tqMeta
==
NULL
)
{
taosMemoryFree
(
pTq
);
#if 0
allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
#endif
return
NULL
;
}
#if 0
pTq->tqPushMgr = tqPushMgrOpen();
if (pTq->tqPushMgr == NULL) {
// free store
taosMemoryFree(pTq);
return NULL;
}
#endif
pTq
->
tqMetaNew
=
taosHashInit
(
64
,
MurmurHash3_32
,
true
,
HASH_ENTRY_LOCK
);
pTq
->
pStreamTasks
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_INT
),
true
,
HASH_NO_LOCK
);
...
...
@@ -248,16 +231,15 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
return
0
;
}
#if 0
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
)
{
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
int64_t fetchOffset;
int64_t blockingTime = pReq->blockingTime;
int32_t reqEpoch = pReq->epoch;
SMqPollReqV2
*
pReq
=
pMsg
->
pCont
;
int64_t
consumerId
=
pReq
->
consumerId
;
int32_t
reqEpoch
=
pReq
->
epoch
;
int64_t
fetchOffset
;
if
(
pReq
->
currentOffset
==
TMQ_CONF__RESET_OFFSET__EARLIEAST
)
{
fetchOffset =
0
;
fetchOffset
=
walGetFirstVer
(
pTq
->
pWal
)
;
}
else
if
(
pReq
->
currentOffset
==
TMQ_CONF__RESET_OFFSET__LATEST
)
{
fetchOffset
=
walGetLastVer
(
pTq
->
pWal
);
}
else
{
...
...
@@ -267,65 +249,29 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
vDebug
(
"tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
pReq
->
currentOffset
,
fetchOffset
);
SMqPollRsp rsp = {
/*.consumerId = consumerId,*/
.numOfTopics = 0,
.pBlockData = NULL,
};
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
if (pConsumer == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode));
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
}
STqExec
*
pExec
=
taosHashGet
(
pTq
->
tqMetaNew
,
pReq
->
subKey
,
strlen
(
pReq
->
subKey
));
ASSERT
(
pExec
);
int32_t consumerEpoch = atomic_load_32(&p
Consumer
->epoch);
int32_t
consumerEpoch
=
atomic_load_32
(
&
p
Exec
->
epoch
);
while
(
consumerEpoch
<
reqEpoch
)
{
consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch);
}
STqTopic* pTopic = NULL;
int32_t sz = taosArrayGetSize(pConsumer->topics);
for (int32_t i = 0; i < sz; i++) {
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
// TODO race condition
ASSERT(pConsumer->consumerId == consumerId);
if (strcmp(topic->topicName, pReq->topic) == 0) {
pTopic = topic;
break;
}
}
if (pTopic == NULL) {
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
TD_VID(pTq->pVnode));
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
tmsgSendRsp(pMsg);
return 0;
consumerEpoch
=
atomic_val_compare_exchange_32
(
&
pExec
->
epoch
,
consumerEpoch
,
reqEpoch
);
}
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
TD_VID(pTq->pVnode));
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pReq
->
currentOffset
;
rsp.skipLogNum = 0;
rsp
.
blockDataLen
=
taosArrayInit
(
0
,
sizeof
(
int32_t
));
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
while
(
1
)
{
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
// TODO
consumerEpoch = atomic_load_32(&pConsumer->epoch);
consumerEpoch
=
atomic_load_32
(
&
pExec
->
epoch
);
if
(
consumerEpoch
>
reqEpoch
)
{
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
consumerEpoch
,
reqEpoch
);
break
;
}
SWalReadHead
*
pHead
;
if (walReadWithHandle_s(p
Topic->pReadh
andle, fetchOffset, &pHead) < 0) {
if
(
walReadWithHandle_s
(
p
Exec
->
pReadH
andle
,
fetchOffset
,
&
pHead
)
<
0
)
{
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
...
...
@@ -333,101 +279,86 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
TD_VID
(
pTq
->
pVnode
),
fetchOffset
);
break
;
}
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
pHead
->
msgType
);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
SSubmitReq
*
pCont
=
(
SSubmitReq
*
)
&
pHead
->
body
;
qTaskInfo_t task = p
Topic->buffer.output[workerId].task
;
qTaskInfo_t
task
=
p
Exec
->
task
[
workerId
]
;
ASSERT
(
task
);
qSetStreamInput
(
task
,
pCont
,
STREAM_DATA_TYPE_SUBMIT_BLOCK
);
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
while
(
1
)
{
SSDataBlock
*
pDataBlock
=
NULL
;
uint64_t ts;
uint64_t
ts
=
0
;
if
(
qExecTask
(
task
,
&
pDataBlock
,
&
ts
)
<
0
)
{
ASSERT(false);
}
if (pDataBlock == NULL) {
/*pos = fetchOffset % TQ_BUFFER_SIZE;*/
break;
ASSERT
(
0
);
}
if
(
pDataBlock
==
NULL
)
break
;
taosArrayPush(pRes, pDataBlock
);
}
ASSERT
(
pDataBlock
->
info
.
rows
!=
0
);
ASSERT
(
pDataBlock
->
info
.
numOfCols
!=
0
);
if (taosArrayGetSize(pRes) == 0) {
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
fetchOffset++;
rsp.skipLogNum++;
taosArrayDestroy(pRes);
continue;
int32_t
dataStrLen
=
sizeof
(
SRetrieveTableRsp
)
+
blockGetEncodeSize
(
pDataBlock
);
void
*
buf
=
taosMemoryCalloc
(
1
,
dataStrLen
);
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
buf
;
pRetrieve
->
useconds
=
ts
;
pRetrieve
->
precision
=
TSDB_DEFAULT_PRECISION
;
pRetrieve
->
compressed
=
0
;
pRetrieve
->
completed
=
1
;
pRetrieve
->
numOfRows
=
htonl
(
pDataBlock
->
info
.
rows
);
// TODO enable compress
int32_t
actualLen
=
0
;
blockCompressEncode
(
pDataBlock
,
pRetrieve
->
data
,
&
actualLen
,
pDataBlock
->
info
.
numOfCols
,
false
);
actualLen
+=
sizeof
(
SRetrieveTableRsp
);
ASSERT
(
actualLen
<=
dataStrLen
);
taosArrayPush
(
rsp
.
blockDataLen
,
&
actualLen
);
taosArrayPush
(
rsp
.
blockData
,
&
buf
);
rsp
.
blockNum
++
;
}
rsp.schema = pTopic->buffer.output[workerId].pReadHandle->pSchemaWrapper;
rsp.rspOffset = fetchOffset;
}
rsp.numOfTopics = 1;
rsp.pBlockData = pRes;
// TODO batch optimization
if
(
rsp
.
blockNum
!=
0
)
break
;
rsp
.
skipLogNum
++
;
fetchOffset
++
;
}
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
pMsg->code = -1;
taosMemoryFree(pHead);
return -1;
}
((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
((SMqRspHead*)buf)->epoch = pReq->epoch;
((SMqRspHead*)buf)->consumerId = consumerId;
ASSERT
(
taosArrayGetSize
(
rsp
.
blockData
)
==
rsp
.
blockNum
);
ASSERT
(
taosArrayGetSize
(
rsp
.
blockDataLen
)
==
rsp
.
blockNum
);
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp);
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(pMsg);
taosMemoryFree(pHead);
return 0;
} else {
taosMemoryFree(pHead);
fetchOffset++;
rsp.skipLogNum++;
}
}
if
(
rsp
.
blockNum
!=
0
)
rsp
.
rspOffset
=
fetchOffset
;
else
rsp
.
rspOffset
=
fetchOffset
-
1
;
/*if (blockingTime != 0) {*/
/*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/
/*} else {*/
int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp);
int32_t
tlen
=
sizeof
(
SMqRspHead
)
+
tEncodeSMqDataBlkRsp
(
NULL
,
&
rsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
pMsg
->
code
=
-
1
;
return
-
1
;
}
((
SMqRspHead
*
)
buf
)
->
mqMsgType
=
TMQ_MSG_TYPE__POLL_RSP
;
((
SMqRspHead
*
)
buf
)
->
epoch
=
pReq
->
epoch
;
rsp.rspOffset = fetchOffset - 1
;
((
SMqRspHead
*
)
buf
)
->
consumerId
=
consumerId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
tEncodeSMqPollRsp(&abuf, &rsp);
rsp.pBlockData = NULL;
tEncodeSMqDataBlkRsp
(
&
abuf
,
&
rsp
);
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
tmsgSendRsp
(
pMsg
);
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
pReq->epoch);
/*}*/
vDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld"
,
TD_VID
(
pTq
->
pVnode
),
fetchOffset
,
consumerId
,
pReq
->
epoch
,
rsp
.
blockNum
,
rsp
.
reqOffset
,
rsp
.
rspOffset
);
// TODO destroy
return
0
;
}
#endif
#if 0
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqPollReq* pReq = pMsg->pCont;
int64_t consumerId = pReq->consumerId;
...
...
@@ -436,7 +367,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int32_t reqEpoch = pReq->epoch;
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset
=
0
;
fetchOffset =
walGetFirstVer(pTq->pWal)
;
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
fetchOffset = walGetLastVer(pTq->pWal);
} else {
...
...
@@ -635,7 +566,56 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
}
#endif
// TODO: persist meta into tdb
int32_t
tqProcessVgChangeReq
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
)
{
SMqRebVgReq
req
;
tDecodeSMqRebVgReq
(
msg
,
&
req
);
// todo lock
STqExec
*
pExec
=
taosHashGet
(
pTq
->
tqMetaNew
,
req
.
subKey
,
strlen
(
req
.
subKey
));
if
(
pExec
==
NULL
)
{
ASSERT
(
req
.
oldConsumerId
==
-
1
);
ASSERT
(
req
.
newConsumerId
!=
-
1
);
STqExec
exec
=
{
0
};
pExec
=
&
exec
;
/*taosInitRWLatch(&pExec->lock);*/
memcpy
(
pExec
->
subKey
,
req
.
subKey
,
TSDB_SUBSCRIBE_KEY_LEN
);
pExec
->
consumerId
=
req
.
newConsumerId
;
pExec
->
epoch
=
-
1
;
pExec
->
qmsg
=
req
.
qmsg
;
req
.
qmsg
=
NULL
;
pExec
->
pReadHandle
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
for
(
int32_t
i
=
0
;
i
<
4
;
i
++
)
{
STqReadHandle
*
pReadHandle
=
tqInitSubmitMsgScanner
(
pTq
->
pVnodeMeta
);
SReadHandle
handle
=
{
.
reader
=
pReadHandle
,
.
meta
=
pTq
->
pVnodeMeta
,
};
pExec
->
task
[
i
]
=
qCreateStreamExecTaskInfo
(
pExec
->
qmsg
,
&
handle
);
ASSERT
(
pExec
->
task
[
i
]);
}
taosHashPut
(
pTq
->
tqMetaNew
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pExec
,
sizeof
(
STqExec
));
return
0
;
}
else
{
if
(
req
.
newConsumerId
!=
-
1
)
{
/*taosWLockLatch(&pExec->lock);*/
ASSERT
(
pExec
->
consumerId
==
req
.
oldConsumerId
);
// TODO handle qmsg and exec modification
atomic_store_64
(
&
pExec
->
consumerId
,
req
.
newConsumerId
);
atomic_add_fetch_32
(
&
pExec
->
epoch
,
1
);
/*taosWUnLockLatch(&pExec->lock);*/
return
0
;
}
else
{
// TODO
/*taosHashRemove(pTq->tqMetaNew, req.subKey, strlen(req.subKey));*/
return
0
;
}
}
}
#if 0
int32_t tqProcessRebReq(STQ* pTq, char* msg) {
SMqMVRebReq req = {0};
terrno = TSDB_CODE_SUCCESS;
...
...
@@ -754,6 +734,7 @@ int32_t tqProcessCancelConnReq(STQ* pTq, char* msg) {
terrno = TSDB_CODE_SUCCESS;
return 0;
}
#endif
int32_t
tqExpandTask
(
STQ
*
pTq
,
SStreamTask
*
pTask
,
int32_t
parallel
)
{
if
(
pTask
->
execType
==
TASK_EXEC__NONE
)
return
0
;
...
...
source/dnode/vnode/src/tq/tqMetaStore.c
浏览文件 @
6a7f526f
...
...
@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
// TODO:replace by an abstract file layer
// #include <fcntl.h>
// #include <string.h>
// #include <unistd.h>
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
6a7f526f
...
...
@@ -80,6 +80,13 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
pRsp
->
msgType
=
TDMT_VND_SUBMIT_RSP
;
vnodeProcessSubmitReq
(
pVnode
,
ptr
,
pRsp
);
break
;
case
TDMT_VND_MQ_VG_CHANGE
:
if
(
tqProcessVgChangeReq
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
// TODO: handle error
}
break
;
#if 0
case TDMT_VND_MQ_SET_CONN: {
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO: handle error
...
...
@@ -93,6 +100,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
if (tqProcessCancelConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
}
} break;
#endif
case
TDMT_VND_TASK_DEPLOY
:
{
if
(
tqProcessTaskDeploy
(
pVnode
->
pTq
,
POINTER_SHIFT
(
pMsg
->
pCont
,
sizeof
(
SMsgHead
)),
pMsg
->
contLen
-
sizeof
(
SMsgHead
))
<
0
)
{
...
...
@@ -309,4 +317,4 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg
pRsp
->
contLen
=
sizeof
(
SSubmitRsp
);
return
0
;
}
\ No newline at end of file
}
source/os/src/osFile.c
浏览文件 @
6a7f526f
...
...
@@ -38,7 +38,7 @@ extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */
#include <sys/file.h>
#if !defined(_TD_DARWIN_64)
#include <sys/sendfile.h>
#include <sys/sendfile.h>
#endif
#include <sys/stat.h>
#include <unistd.h>
...
...
@@ -58,9 +58,9 @@ typedef int32_t FileFd;
typedef
struct
TdFile
{
TdThreadRwlock
rwlock
;
int
refId
;
FileFd
fd
;
FILE
*
fp
;
int
refId
;
FileFd
fd
;
FILE
*
fp
;
}
*
TdFilePtr
,
TdFile
;
#define FILE_WITH_LOCK 1
...
...
@@ -182,7 +182,7 @@ int32_t taosStatFile(const char *path, int64_t *size, int32_t *mtime) {
return
0
;
#else
struct
stat
fileStat
;
int32_t
code
=
stat
(
path
,
&
fileStat
);
int32_t
code
=
stat
(
path
,
&
fileStat
);
if
(
code
<
0
)
{
return
code
;
}
...
...
@@ -203,7 +203,7 @@ int32_t taosDevInoFile(const char *path, int64_t *stDev, int64_t *stIno) {
return
0
;
#else
struct
stat
fileStat
;
int32_t
code
=
stat
(
path
,
&
fileStat
);
int32_t
code
=
stat
(
path
,
&
fileStat
);
if
(
code
<
0
)
{
return
code
;
}
...
...
@@ -226,7 +226,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return
NULL
;
#else
int
fd
=
-
1
;
int
fd
=
-
1
;
FILE
*
fp
=
NULL
;
if
(
tdFileOptions
&
TD_FILE_STREAM
)
{
char
*
mode
=
NULL
;
...
...
@@ -325,7 +325,7 @@ int64_t taosReadFile(TdFilePtr pFile, void *buf, int64_t count) {
#if FILE_WITH_LOCK
taosThreadRwlockRdlock
(
&
(
pFile
->
rwlock
));
#endif
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
int64_t
leftbytes
=
count
;
int64_t
readbytes
;
char
*
tbuf
=
(
char
*
)
buf
;
...
...
@@ -365,7 +365,7 @@ int64_t taosPReadFile(TdFilePtr pFile, void *buf, int64_t count, int64_t offset)
#if FILE_WITH_LOCK
taosThreadRwlockRdlock
(
&
(
pFile
->
rwlock
));
#endif
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
int64_t
ret
=
pread
(
pFile
->
fd
,
buf
,
count
,
offset
);
#if FILE_WITH_LOCK
taosThreadRwlockUnlock
(
&
(
pFile
->
rwlock
));
...
...
@@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
#if FILE_WITH_LOCK
taosThreadRwlockWrlock
(
&
(
pFile
->
rwlock
));
#endif
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
/*assert(pFile->fd >= 0); // Please check if you have closed the file.*/
int64_t
nleft
=
count
;
int64_t
nwritten
=
0
;
...
...
@@ -414,7 +414,7 @@ int64_t taosLSeekFile(TdFilePtr pFile, int64_t offset, int32_t whence) {
#if FILE_WITH_LOCK
taosThreadRwlockRdlock
(
&
(
pFile
->
rwlock
));
#endif
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
int64_t
ret
=
lseek
(
pFile
->
fd
,
(
long
)
offset
,
whence
);
#if FILE_WITH_LOCK
taosThreadRwlockUnlock
(
&
(
pFile
->
rwlock
));
...
...
@@ -429,10 +429,10 @@ int32_t taosFStatFile(TdFilePtr pFile, int64_t *size, int32_t *mtime) {
if
(
pFile
==
NULL
)
{
return
0
;
}
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
struct
stat
fileStat
;
int32_t
code
=
fstat
(
pFile
->
fd
,
&
fileStat
);
int32_t
code
=
fstat
(
pFile
->
fd
,
&
fileStat
);
if
(
code
<
0
)
{
return
code
;
}
...
...
@@ -456,7 +456,7 @@ int32_t taosLockFile(TdFilePtr pFile) {
if
(
pFile
==
NULL
)
{
return
0
;
}
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
return
(
int32_t
)
flock
(
pFile
->
fd
,
LOCK_EX
|
LOCK_NB
);
#endif
...
...
@@ -469,7 +469,7 @@ int32_t taosUnLockFile(TdFilePtr pFile) {
if
(
pFile
==
NULL
)
{
return
0
;
}
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
return
(
int32_t
)
flock
(
pFile
->
fd
,
LOCK_UN
|
LOCK_NB
);
#endif
...
...
@@ -529,7 +529,7 @@ int32_t taosFtruncateFile(TdFilePtr pFile, int64_t l_size) {
if
(
pFile
==
NULL
)
{
return
0
;
}
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
return
ftruncate
(
pFile
->
fd
,
l_size
);
#endif
...
...
@@ -637,7 +637,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co
}
off_t
len
=
count
;
while
(
len
>
0
)
{
char
buf
[
1024
*
16
];
char
buf
[
1024
*
16
];
off_t
n
=
sizeof
(
buf
);
if
(
len
<
n
)
n
=
len
;
size_t
m
=
fread
(
buf
,
1
,
n
,
in_file
);
...
...
@@ -662,7 +662,7 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) {
}
off_t
len
=
count
;
while
(
len
>
0
)
{
char
buf
[
1024
*
16
];
char
buf
[
1024
*
16
];
off_t
n
=
sizeof
(
buf
);
if
(
len
<
n
)
n
=
len
;
size_t
m
=
read
(
sfd
,
buf
,
n
);
...
...
@@ -750,7 +750,7 @@ void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length) {
if
(
pFile
==
NULL
)
{
return
NULL
;
}
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
assert
(
pFile
->
fd
>=
0
);
// Please check if you have closed the file.
void
*
ptr
=
mmap
(
NULL
,
length
,
PROT_READ
,
MAP_SHARED
,
pFile
->
fd
,
0
);
return
ptr
;
...
...
@@ -811,4 +811,4 @@ bool taosCheckAccessFile(const char *pathname, int32_t tdFileAccessOptions) {
bool
taosCheckExistFile
(
const
char
*
pathname
)
{
return
taosCheckAccessFile
(
pathname
,
TD_FILE_ACCESS_EXIST_OK
);
};
#endif // WINDOWS
#endif
// WINDOWS
source/util/src/tarray.c
浏览文件 @
6a7f526f
...
...
@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "tarray.h"
#include "tcoding.h"
SArray
*
taosArrayInit
(
size_t
size
,
size_t
elemSize
)
{
assert
(
elemSize
>
0
);
...
...
@@ -75,7 +76,7 @@ int32_t taosArrayEnsureCap(SArray* pArray, size_t newCap) {
}
void
*
taosArrayAddBatch
(
SArray
*
pArray
,
const
void
*
pData
,
int32_t
nEles
)
{
if
(
p
Array
==
NULL
||
p
Data
==
NULL
)
{
if
(
pData
==
NULL
)
{
return
NULL
;
}
...
...
@@ -317,7 +318,6 @@ void taosArrayClearEx(SArray* pArray, void (*fp)(void*)) {
pArray
->
size
=
0
;
}
void
*
taosArrayDestroy
(
SArray
*
pArray
)
{
if
(
pArray
)
{
taosMemoryFree
(
pArray
->
pData
);
...
...
@@ -327,7 +327,14 @@ void* taosArrayDestroy(SArray* pArray) {
return
NULL
;
}
void
taosArrayDestroyEx
(
SArray
*
pArray
,
void
(
*
fp
)(
void
*
))
{
void
taosArrayDestroyP
(
SArray
*
pArray
,
FDelete
fp
)
{
for
(
int32_t
i
=
0
;
i
<
pArray
->
size
;
i
++
)
{
fp
(
*
(
void
**
)
TARRAY_GET_ELEM
(
pArray
,
i
));
}
taosArrayDestroy
(
pArray
);
}
void
taosArrayDestroyEx
(
SArray
*
pArray
,
FDelete
fp
)
{
if
(
pArray
==
NULL
)
{
return
;
}
...
...
@@ -436,6 +443,39 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
return
;
}
SArray
*
taosArrayDeepCopy
(
const
SArray
*
pSrc
,
FCopy
deepCopy
)
{
ASSERT
(
pSrc
->
elemSize
==
sizeof
(
void
*
));
SArray
*
pArray
=
taosArrayInit
(
pSrc
->
size
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
pSrc
->
size
;
i
++
)
{
void
*
clone
=
deepCopy
(
taosArrayGetP
(
pSrc
,
i
));
taosArrayPush
(
pArray
,
&
clone
);
}
return
pArray
;
}
int32_t
taosEncodeArray
(
void
**
buf
,
const
SArray
*
pArray
,
FEncode
encode
)
{
int32_t
tlen
=
0
;
int32_t
sz
=
pArray
->
size
;
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
void
*
data
=
taosArrayGetP
(
pArray
,
i
);
tlen
+=
encode
(
buf
,
data
);
}
return
tlen
;
}
void
*
taosDecodeArray
(
const
void
*
buf
,
SArray
**
pArray
,
FDecode
decode
,
int32_t
dataSz
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
*
pArray
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
void
*
data
=
taosMemoryCalloc
(
1
,
dataSz
);
buf
=
decode
(
buf
,
data
);
taosArrayPush
(
*
pArray
,
&
data
);
}
return
(
void
*
)
buf
;
}
// order array<type *>
void
taosArraySortPWithExt
(
SArray
*
pArray
,
__ext_compar_fn_t
fn
,
const
void
*
param
)
{
taosArrayGetSize
(
pArray
)
>
8
?
taosArrayQuickSort
(
pArray
,
fn
,
param
)
:
taosArrayInsertSort
(
pArray
,
fn
,
param
);
...
...
tests/script/tsim/tmq/basic1.sim
浏览文件 @
6a7f526f
...
...
@@ -33,8 +33,8 @@ endi
sql connect
$dbNamme = d0
print =============== create database , vgroup
1
sql create database $dbNamme vgroups
1
print =============== create database , vgroup
4
sql create database $dbNamme vgroups
4
sql use $dbNamme
print =============== create super table
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录