Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
8b348976
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
8b348976
编写于
8月 30, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
feat(tmq): support taosx
上级
cde4621e
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
243 addition
and
58 deletion
+243
-58
include/client/taos.h
include/client/taos.h
+1
-0
include/common/tcommon.h
include/common/tcommon.h
+1
-1
include/common/tmsg.h
include/common/tmsg.h
+30
-14
include/util/tencode.h
include/util/tencode.h
+17
-11
source/client/inc/clientInt.h
source/client/inc/clientInt.h
+18
-5
source/client/src/clientMain.c
source/client/src/clientMain.c
+13
-0
source/client/src/tmq.c
source/client/src/tmq.c
+60
-7
source/common/src/tmsg.c
source/common/src/tmsg.c
+102
-20
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+1
-0
未找到文件。
include/client/taos.h
浏览文件 @
8b348976
...
...
@@ -254,6 +254,7 @@ enum tmq_res_t {
TMQ_RES_INVALID
=
-
1
,
TMQ_RES_DATA
=
1
,
TMQ_RES_TABLE_META
=
2
,
TMQ_RES_TAOSX
=
3
,
};
typedef
struct
tmq_raw_data
{
...
...
include/common/tcommon.h
浏览文件 @
8b348976
...
...
@@ -73,6 +73,7 @@ enum {
TMQ_MSG_TYPE__POLL_RSP
,
TMQ_MSG_TYPE__POLL_META_RSP
,
TMQ_MSG_TYPE__EP_RSP
,
TMQ_MSG_TYPE__TAOSX_RSP
,
TMQ_MSG_TYPE__END_RSP
,
};
...
...
@@ -129,7 +130,6 @@ typedef struct SDataBlockInfo {
uint32_t
capacity
;
// TODO: optimize and remove following
int64_t
version
;
// used for stream, and need serialization
int64_t
ts
;
// used for stream, and need serialization
int32_t
childId
;
// used for stream, do not serialize
EStreamType
type
;
// used for stream, do not serialize
STimeWindow
calWin
;
// used for stream, do not serialize
...
...
include/common/tmsg.h
浏览文件 @
8b348976
...
...
@@ -276,7 +276,6 @@ struct SSchema {
char
name
[
TSDB_COL_NAME_LEN
];
};
typedef
struct
{
char
tbName
[
TSDB_TABLE_NAME_LEN
];
char
stbName
[
TSDB_TABLE_NAME_LEN
];
...
...
@@ -295,17 +294,15 @@ typedef struct {
SSchema
*
pSchemas
;
}
STableMetaRsp
;
typedef
struct
{
int32_t
code
;
int8_t
hashMeta
;
int64_t
uid
;
char
*
tblFName
;
int32_t
numOfRows
;
int32_t
affectedRows
;
int64_t
sver
;
STableMetaRsp
*
pMeta
;
int32_t
code
;
int8_t
hashMeta
;
int64_t
uid
;
char
*
tblFName
;
int32_t
numOfRows
;
int32_t
affectedRows
;
int64_t
sver
;
STableMetaRsp
*
pMeta
;
}
SSubmitBlkRsp
;
typedef
struct
{
...
...
@@ -320,7 +317,7 @@ typedef struct {
int32_t
tEncodeSSubmitRsp
(
SEncoder
*
pEncoder
,
const
SSubmitRsp
*
pRsp
);
int32_t
tDecodeSSubmitRsp
(
SDecoder
*
pDecoder
,
SSubmitRsp
*
pRsp
);
void
tFreeSSubmitBlkRsp
(
void
*
param
);
void
tFreeSSubmitBlkRsp
(
void
*
param
);
void
tFreeSSubmitRsp
(
SSubmitRsp
*
pRsp
);
#define COL_SMA_ON ((int8_t)0x1)
...
...
@@ -2049,8 +2046,8 @@ typedef struct {
STableMetaRsp
*
pMeta
;
}
SVCreateTbRsp
,
SVUpdateTbRsp
;
int
tEncodeSVCreateTbRsp
(
SEncoder
*
pCoder
,
const
SVCreateTbRsp
*
pRsp
);
int
tDecodeSVCreateTbRsp
(
SDecoder
*
pCoder
,
SVCreateTbRsp
*
pRsp
);
int
tEncodeSVCreateTbRsp
(
SEncoder
*
pCoder
,
const
SVCreateTbRsp
*
pRsp
);
int
tDecodeSVCreateTbRsp
(
SDecoder
*
pCoder
,
SVCreateTbRsp
*
pRsp
);
void
tFreeSVCreateTbRsp
(
void
*
param
);
int32_t
tSerializeSVCreateTbReq
(
void
**
buf
,
SVCreateTbReq
*
pReq
);
...
...
@@ -2961,6 +2958,25 @@ typedef struct {
int32_t
tEncodeSMqDataRsp
(
SEncoder
*
pEncoder
,
const
SMqDataRsp
*
pRsp
);
int32_t
tDecodeSMqDataRsp
(
SDecoder
*
pDecoder
,
SMqDataRsp
*
pRsp
);
typedef
struct
{
SMqRspHead
head
;
STqOffsetVal
reqOffset
;
STqOffsetVal
rspOffset
;
int32_t
blockNum
;
int8_t
withTbName
;
int8_t
withSchema
;
SArray
*
blockDataLen
;
SArray
*
blockData
;
SArray
*
blockTbName
;
SArray
*
blockSchema
;
int32_t
createTableNum
;
SArray
*
createTableLen
;
SArray
*
createTableReq
;
}
STaosxRsp
;
int32_t
tEncodeSTaosxRsp
(
SEncoder
*
pEncoder
,
const
STaosxRsp
*
pRsp
);
int32_t
tDecodeSTaosxRsp
(
SDecoder
*
pDecoder
,
STaosxRsp
*
pRsp
);
typedef
struct
{
SMqRspHead
head
;
char
cgroup
[
TSDB_CGROUP_LEN
];
...
...
include/util/tencode.h
浏览文件 @
8b348976
...
...
@@ -264,12 +264,14 @@ static FORCE_INLINE int32_t tEncodeDouble(SEncoder* pCoder, double val) {
static
FORCE_INLINE
int32_t
tEncodeBinary
(
SEncoder
*
pCoder
,
const
uint8_t
*
val
,
uint32_t
len
)
{
if
(
tEncodeU32v
(
pCoder
,
len
)
<
0
)
return
-
1
;
if
(
pCoder
->
data
)
{
if
(
TD_CODER_CHECK_CAPACITY_FAILED
(
pCoder
,
len
))
return
-
1
;
memcpy
(
TD_CODER_CURRENT
(
pCoder
),
val
,
len
);
}
if
(
len
)
{
if
(
pCoder
->
data
)
{
if
(
TD_CODER_CHECK_CAPACITY_FAILED
(
pCoder
,
len
))
return
-
1
;
memcpy
(
TD_CODER_CURRENT
(
pCoder
),
val
,
len
);
}
TD_CODER_MOVE_POS
(
pCoder
,
len
);
TD_CODER_MOVE_POS
(
pCoder
,
len
);
}
return
0
;
}
...
...
@@ -414,14 +416,18 @@ static int32_t tDecodeCStrTo(SDecoder* pCoder, char* val) {
static
FORCE_INLINE
int32_t
tDecodeBinaryAlloc
(
SDecoder
*
pCoder
,
void
**
val
,
uint64_t
*
len
)
{
uint64_t
length
=
0
;
if
(
tDecodeU64v
(
pCoder
,
&
length
)
<
0
)
return
-
1
;
if
(
len
)
*
len
=
length
;
if
(
length
)
{
if
(
len
)
*
len
=
length
;
if
(
TD_CODER_CHECK_CAPACITY_FAILED
(
pCoder
,
length
))
return
-
1
;
*
val
=
taosMemoryMalloc
(
length
);
if
(
*
val
==
NULL
)
return
-
1
;
memcpy
(
*
val
,
TD_CODER_CURRENT
(
pCoder
),
length
);
if
(
TD_CODER_CHECK_CAPACITY_FAILED
(
pCoder
,
length
))
return
-
1
;
*
val
=
taosMemoryMalloc
(
length
);
if
(
*
val
==
NULL
)
return
-
1
;
memcpy
(
*
val
,
TD_CODER_CURRENT
(
pCoder
),
length
);
TD_CODER_MOVE_POS
(
pCoder
,
length
);
TD_CODER_MOVE_POS
(
pCoder
,
length
);
}
else
{
*
val
=
NULL
;
}
return
0
;
}
...
...
source/client/inc/clientInt.h
浏览文件 @
8b348976
...
...
@@ -52,15 +52,17 @@ enum {
RES_TYPE__QUERY
=
1
,
RES_TYPE__TMQ
,
RES_TYPE__TMQ_META
,
RES_TYPE__TAOSX
,
};
#define SHOW_VARIABLES_RESULT_COLS 2
#define SHOW_VARIABLES_RESULT_FIELD1_LEN (TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE)
#define SHOW_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ || *(int8_t*)res == RES_TYPE__TAOSX)
#define TD_RES_TMQ_META(res) (*(int8_t*)res == RES_TYPE__TMQ_META)
#define TD_RES_TMQ_TAOSX(res) (*(int8_t*)res == RES_TYPE__TAOSX)
typedef
struct
SAppInstInfo
SAppInstInfo
;
...
...
@@ -198,8 +200,8 @@ typedef struct {
int32_t
vgId
;
SSchemaWrapper
schema
;
int32_t
resIter
;
SMqDataRsp
rsp
;
SReqResultInfo
resInfo
;
SMqDataRsp
rsp
;
}
SMqRspObj
;
typedef
struct
{
...
...
@@ -210,6 +212,17 @@ typedef struct {
SMqMetaRsp
metaRsp
;
}
SMqMetaRspObj
;
typedef
struct
{
int8_t
resType
;
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int32_t
vgId
;
SSchemaWrapper
schema
;
int32_t
resIter
;
SReqResultInfo
resInfo
;
STaosxRsp
rsp
;
}
SMqTaosxRspObj
;
typedef
struct
SRequestObj
{
int8_t
resType
;
// query or tmq
uint64_t
requestId
;
...
...
@@ -369,7 +382,7 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData*
int32_t
refreshMeta
(
STscObj
*
pTscObj
,
SRequestObj
*
pRequest
);
int32_t
updateQnodeList
(
SAppInstInfo
*
pInfo
,
SArray
*
pNodeList
);
void
doAsyncQuery
(
SRequestObj
*
pRequest
,
bool
forceUpdateMeta
);
int32_t
removeMeta
(
STscObj
*
pTscObj
,
SArray
*
tbList
);
int32_t
removeMeta
(
STscObj
*
pTscObj
,
SArray
*
tbList
);
int32_t
handleAlterTbExecRes
(
void
*
res
,
struct
SCatalog
*
pCatalog
);
int32_t
handleCreateTbExecRes
(
void
*
res
,
SCatalog
*
pCatalog
);
bool
qnodeRequired
(
SRequestObj
*
pRequest
);
...
...
source/client/src/clientMain.c
浏览文件 @
8b348976
...
...
@@ -184,6 +184,19 @@ void taos_free_result(TAOS_RES *res) {
SRequestObj
*
pRequest
=
(
SRequestObj
*
)
res
;
tscDebug
(
"0x%"
PRIx64
" taos_free_result start to free query"
,
pRequest
->
requestId
);
destroyRequest
(
pRequest
);
}
else
if
(
TD_RES_TMQ_TAOSX
(
res
))
{
SMqTaosxRspObj
*
pRsp
=
(
SMqTaosxRspObj
*
)
res
;
if
(
pRsp
->
rsp
.
blockData
)
taosArrayDestroyP
(
pRsp
->
rsp
.
blockData
,
taosMemoryFree
);
if
(
pRsp
->
rsp
.
blockDataLen
)
taosArrayDestroy
(
pRsp
->
rsp
.
blockDataLen
);
if
(
pRsp
->
rsp
.
withTbName
)
taosArrayDestroyP
(
pRsp
->
rsp
.
blockTbName
,
taosMemoryFree
);
if
(
pRsp
->
rsp
.
withSchema
)
taosArrayDestroyP
(
pRsp
->
rsp
.
blockSchema
,
(
FDelete
)
tDeleteSSchemaWrapper
);
// taosx
taosArrayDestroy
(
pRsp
->
rsp
.
createTableLen
);
taosArrayDestroyP
(
pRsp
->
rsp
.
createTableReq
,
taosMemoryFree
);
pRsp
->
resInfo
.
pRspMsg
=
NULL
;
doFreeReqResultInfo
(
&
pRsp
->
resInfo
);
taosMemoryFree
(
pRsp
);
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
pRsp
=
(
SMqRspObj
*
)
res
;
if
(
pRsp
->
rsp
.
blockData
)
taosArrayDestroyP
(
pRsp
->
rsp
.
blockData
,
taosMemoryFree
);
...
...
source/client/src/tmq.c
浏览文件 @
8b348976
...
...
@@ -164,6 +164,7 @@ typedef struct {
union
{
SMqDataRsp
dataRsp
;
SMqMetaRsp
metaRsp
;
STaosxRsp
taosxRsp
;
};
}
SMqPollRspWrapper
;
...
...
@@ -1130,21 +1131,29 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tDecodeSMqDataRsp
(
&
decoder
,
&
pRspWrapper
->
dataRsp
);
tDecoderClear
(
&
decoder
);
memcpy
(
&
pRspWrapper
->
dataRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
}
else
{
ASSERT
(
rspType
==
TMQ_MSG_TYPE__POLL_META_RSP
);
tscDebug
(
"consumer:%"
PRId64
", recv poll: vgId:%d, req offset %"
PRId64
", rsp offset %"
PRId64
" type %d"
,
tmq
->
consumerId
,
pVg
->
vgId
,
pRspWrapper
->
dataRsp
.
reqOffset
.
version
,
pRspWrapper
->
dataRsp
.
rspOffset
.
version
,
rspType
);
}
else
if
(
rspType
==
TMQ_MSG_TYPE__POLL_META_RSP
)
{
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
pMsg
->
len
-
sizeof
(
SMqRspHead
));
tDecodeSMqMetaRsp
(
&
decoder
,
&
pRspWrapper
->
metaRsp
);
tDecoderClear
(
&
decoder
);
memcpy
(
&
pRspWrapper
->
metaRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
}
else
if
(
rspType
==
TMQ_MSG_TYPE__TAOSX_RSP
)
{
SDecoder
decoder
;
tDecoderInit
(
&
decoder
,
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
pMsg
->
len
-
sizeof
(
SMqRspHead
));
tDecodeSTaosxRsp
(
&
decoder
,
&
pRspWrapper
->
taosxRsp
);
tDecoderClear
(
&
decoder
);
memcpy
(
&
pRspWrapper
->
taosxRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
}
else
{
ASSERT
(
0
);
}
taosMemoryFree
(
pMsg
->
pData
);
tscDebug
(
"consumer:%"
PRId64
", recv poll: vgId:%d, req offset %"
PRId64
", rsp offset %"
PRId64
" type %d"
,
tmq
->
consumerId
,
pVg
->
vgId
,
pRspWrapper
->
dataRsp
.
reqOffset
.
version
,
pRspWrapper
->
dataRsp
.
rspOffset
.
version
,
rspType
);
taosWriteQitem
(
tmq
->
mqueue
,
pRspWrapper
);
tsem_post
(
&
tmq
->
rspSem
);
...
...
@@ -1443,6 +1452,24 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return
pRspObj
;
}
SMqTaosxRspObj
*
tmqBuildTaosxRspFromWrapper
(
SMqPollRspWrapper
*
pWrapper
)
{
SMqTaosxRspObj
*
pRspObj
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqTaosxRspObj
));
pRspObj
->
resType
=
RES_TYPE__TAOSX
;
tstrncpy
(
pRspObj
->
topic
,
pWrapper
->
topicHandle
->
topicName
,
TSDB_TOPIC_FNAME_LEN
);
tstrncpy
(
pRspObj
->
db
,
pWrapper
->
topicHandle
->
db
,
TSDB_DB_FNAME_LEN
);
pRspObj
->
vgId
=
pWrapper
->
vgHandle
->
vgId
;
pRspObj
->
resIter
=
-
1
;
memcpy
(
&
pRspObj
->
rsp
,
&
pWrapper
->
dataRsp
,
sizeof
(
SMqTaosxRspObj
));
pRspObj
->
resInfo
.
totalRows
=
0
;
pRspObj
->
resInfo
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
if
(
!
pWrapper
->
dataRsp
.
withSchema
)
{
setResSchemaInfo
(
&
pRspObj
->
resInfo
,
pWrapper
->
topicHandle
->
schema
.
pSchema
,
pWrapper
->
topicHandle
->
schema
.
nCols
);
}
return
pRspObj
;
}
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
timeout
)
{
/*tscDebug("call poll");*/
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tmq
->
clientTopics
);
i
++
)
{
...
...
@@ -1595,6 +1622,30 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
pollRspWrapper
->
metaRsp
.
head
.
epoch
,
consumerEpoch
);
taosFreeQitem
(
pollRspWrapper
);
}
}
else
if
(
rspWrapper
->
tmqRspType
==
TMQ_MSG_TYPE__TAOSX_RSP
)
{
SMqPollRspWrapper
*
pollRspWrapper
=
(
SMqPollRspWrapper
*
)
rspWrapper
;
/*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/
int32_t
consumerEpoch
=
atomic_load_32
(
&
tmq
->
epoch
);
if
(
pollRspWrapper
->
taosxRsp
.
head
.
epoch
==
consumerEpoch
)
{
SMqClientVg
*
pVg
=
pollRspWrapper
->
vgHandle
;
/*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/
pVg
->
currentOffset
=
pollRspWrapper
->
taosxRsp
.
rspOffset
;
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
if
(
pollRspWrapper
->
taosxRsp
.
blockNum
==
0
)
{
taosFreeQitem
(
pollRspWrapper
);
rspWrapper
=
NULL
;
continue
;
}
// build rsp
SMqRspObj
*
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
);
taosFreeQitem
(
pollRspWrapper
);
return
pRsp
;
}
else
{
tscDebug
(
"msg discard since epoch mismatch: msg epoch %d, consumer epoch %d
\n
"
,
pollRspWrapper
->
taosxRsp
.
head
.
epoch
,
consumerEpoch
);
taosFreeQitem
(
pollRspWrapper
);
}
}
else
{
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
bool
reset
=
false
;
...
...
@@ -1707,9 +1758,11 @@ tmq_res_t tmq_get_res_type(TAOS_RES* res) {
}
else
if
(
TD_RES_TMQ_META
(
res
))
{
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
res
;
if
(
pMetaRspObj
->
metaRsp
.
resMsgType
==
TDMT_VND_DELETE
)
{
return
TMQ_RES_
DATA
;
return
TMQ_RES_
TAOSX
;
}
return
TMQ_RES_TABLE_META
;
}
else
if
(
TD_RES_TMQ_TAOSX
(
res
))
{
return
TMQ_RES_TAOSX
;
}
else
{
return
TMQ_RES_INVALID
;
}
...
...
source/common/src/tmsg.c
浏览文件 @
8b348976
...
...
@@ -3330,7 +3330,7 @@ int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
return
0
;
}
void
tFreeSTableMetaRsp
(
void
*
pRsp
)
{
taosMemoryFreeClear
(((
STableMetaRsp
*
)
pRsp
)
->
pSchemas
);
}
void
tFreeSTableMetaRsp
(
void
*
pRsp
)
{
taosMemoryFreeClear
(((
STableMetaRsp
*
)
pRsp
)
->
pSchemas
);
}
void
tFreeSTableIndexRsp
(
void
*
info
)
{
if
(
NULL
==
info
)
{
...
...
@@ -5119,17 +5119,17 @@ int tDecodeSVCreateTbRsp(SDecoder *pCoder, SVCreateTbRsp *pRsp) {
}
else
{
pRsp
->
pMeta
=
NULL
;
}
tEndDecode
(
pCoder
);
return
0
;
}
void
tFreeSVCreateTbRsp
(
void
*
param
)
{
void
tFreeSVCreateTbRsp
(
void
*
param
)
{
if
(
NULL
==
param
)
{
return
;
}
SVCreateTbRsp
*
pRsp
=
(
SVCreateTbRsp
*
)
param
;
SVCreateTbRsp
*
pRsp
=
(
SVCreateTbRsp
*
)
param
;
if
(
pRsp
->
pMeta
)
{
taosMemoryFree
(
pRsp
->
pMeta
->
pSchemas
);
taosMemoryFree
(
pRsp
->
pMeta
);
...
...
@@ -5345,7 +5345,7 @@ static int32_t tDecodeSSubmitBlkRsp(SDecoder *pDecoder, SSubmitBlkRsp *pBlock) {
if
(
tDecodeI32v
(
pDecoder
,
&
pBlock
->
numOfRows
)
<
0
)
return
-
1
;
if
(
tDecodeI32v
(
pDecoder
,
&
pBlock
->
affectedRows
)
<
0
)
return
-
1
;
if
(
tDecodeI64v
(
pDecoder
,
&
pBlock
->
sver
)
<
0
)
return
-
1
;
int32_t
meta
=
0
;
if
(
tDecodeI32
(
pDecoder
,
&
meta
)
<
0
)
return
-
1
;
if
(
meta
)
{
...
...
@@ -5393,12 +5393,12 @@ int32_t tDecodeSSubmitRsp(SDecoder *pDecoder, SSubmitRsp *pRsp) {
return
0
;
}
void
tFreeSSubmitBlkRsp
(
void
*
param
)
{
void
tFreeSSubmitBlkRsp
(
void
*
param
)
{
if
(
NULL
==
param
)
{
return
;
}
SSubmitBlkRsp
*
pRsp
=
(
SSubmitBlkRsp
*
)
param
;
SSubmitBlkRsp
*
pRsp
=
(
SSubmitBlkRsp
*
)
param
;
taosMemoryFree
(
pRsp
->
tblFName
);
if
(
pRsp
->
pMeta
)
{
...
...
@@ -5407,7 +5407,6 @@ void tFreeSSubmitBlkRsp(void* param) {
}
}
void
tFreeSSubmitRsp
(
SSubmitRsp
*
pRsp
)
{
if
(
NULL
==
pRsp
)
return
;
...
...
@@ -5619,7 +5618,6 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp *pRsp) {
}
}
int32_t
tEncodeSMCreateStbRsp
(
SEncoder
*
pEncoder
,
const
SMCreateStbRsp
*
pRsp
)
{
if
(
tStartEncode
(
pEncoder
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pRsp
->
pMeta
->
pSchemas
?
1
:
0
)
<
0
)
return
-
1
;
...
...
@@ -5671,8 +5669,6 @@ void tFreeSMCreateStbRsp(SMCreateStbRsp *pRsp) {
}
}
int32_t
tEncodeSTqOffsetVal
(
SEncoder
*
pEncoder
,
const
STqOffsetVal
*
pOffsetVal
)
{
if
(
tEncodeI8
(
pEncoder
,
pOffsetVal
->
type
)
<
0
)
return
-
1
;
if
(
pOffsetVal
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
pOffsetVal
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
...
...
@@ -5690,7 +5686,7 @@ int32_t tEncodeSTqOffsetVal(SEncoder *pEncoder, const STqOffsetVal *pOffsetVal)
int32_t
tDecodeSTqOffsetVal
(
SDecoder
*
pDecoder
,
STqOffsetVal
*
pOffsetVal
)
{
if
(
tDecodeI8
(
pDecoder
,
&
pOffsetVal
->
type
)
<
0
)
return
-
1
;
if
(
pOffsetVal
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
pOffsetVal
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
if
(
pOffsetVal
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
pOffsetVal
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
if
(
tDecodeI64
(
pDecoder
,
&
pOffsetVal
->
uid
)
<
0
)
return
-
1
;
if
(
tDecodeI64
(
pDecoder
,
&
pOffsetVal
->
ts
)
<
0
)
return
-
1
;
}
else
if
(
pOffsetVal
->
type
==
TMQ_OFFSET__LOG
)
{
...
...
@@ -5712,7 +5708,7 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
snprintf
(
buf
,
maxLen
,
"offset(reset to latest)"
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__LOG
)
{
snprintf
(
buf
,
maxLen
,
"offset(log) ver:%"
PRId64
,
pVal
->
version
);
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
pVal
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
}
else
if
(
pVal
->
type
==
TMQ_OFFSET__SNAPSHOT_DATA
||
pVal
->
type
==
TMQ_OFFSET__SNAPSHOT_META
)
{
snprintf
(
buf
,
maxLen
,
"offset(ss data) uid:%"
PRId64
", ts:%"
PRId64
,
pVal
->
uid
,
pVal
->
ts
);
}
else
{
ASSERT
(
0
);
...
...
@@ -5813,17 +5809,17 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
return
0
;
}
int32_t
tEncodeSMqMetaRsp
(
SEncoder
*
pEncoder
,
const
SMqMetaRsp
*
pRsp
)
{
int32_t
tEncodeSMqMetaRsp
(
SEncoder
*
pEncoder
,
const
SMqMetaRsp
*
pRsp
)
{
if
(
tEncodeSTqOffsetVal
(
pEncoder
,
&
pRsp
->
rspOffset
)
<
0
)
return
-
1
;
if
(
tEncodeI16
(
pEncoder
,
pRsp
->
resMsgType
))
return
-
1
;
if
(
tEncodeBinary
(
pEncoder
,
pRsp
->
metaRsp
,
pRsp
->
metaRspLen
))
return
-
1
;
if
(
tEncodeI16
(
pEncoder
,
pRsp
->
resMsgType
))
return
-
1
;
if
(
tEncodeBinary
(
pEncoder
,
pRsp
->
metaRsp
,
pRsp
->
metaRspLen
))
return
-
1
;
return
0
;
}
int32_t
tDecodeSMqMetaRsp
(
SDecoder
*
pDecoder
,
SMqMetaRsp
*
pRsp
)
{
int32_t
tDecodeSMqMetaRsp
(
SDecoder
*
pDecoder
,
SMqMetaRsp
*
pRsp
)
{
if
(
tDecodeSTqOffsetVal
(
pDecoder
,
&
pRsp
->
rspOffset
)
<
0
)
return
-
1
;
if
(
tDecodeI16
(
pDecoder
,
&
pRsp
->
resMsgType
)
<
0
)
return
-
1
;
if
(
tDecodeBinaryAlloc
(
pDecoder
,
&
pRsp
->
metaRsp
,
(
uint64_t
*
)
&
pRsp
->
metaRspLen
)
<
0
)
return
-
1
;
if
(
tDecodeBinaryAlloc
(
pDecoder
,
&
pRsp
->
metaRsp
,
(
uint64_t
*
)
&
pRsp
->
metaRspLen
)
<
0
)
return
-
1
;
return
0
;
}
...
...
@@ -5893,6 +5889,92 @@ int32_t tDecodeSMqDataRsp(SDecoder *pDecoder, SMqDataRsp *pRsp) {
return
0
;
}
int32_t
tEncodeSTaosxRsp
(
SEncoder
*
pEncoder
,
const
STaosxRsp
*
pRsp
)
{
if
(
tEncodeSTqOffsetVal
(
pEncoder
,
&
pRsp
->
reqOffset
)
<
0
)
return
-
1
;
if
(
tEncodeSTqOffsetVal
(
pEncoder
,
&
pRsp
->
rspOffset
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pRsp
->
blockNum
)
<
0
)
return
-
1
;
if
(
pRsp
->
blockNum
!=
0
)
{
if
(
tEncodeI8
(
pEncoder
,
pRsp
->
withTbName
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pRsp
->
withSchema
)
<
0
)
return
-
1
;
for
(
int32_t
i
=
0
;
i
<
pRsp
->
blockNum
;
i
++
)
{
int32_t
bLen
=
*
(
int32_t
*
)
taosArrayGet
(
pRsp
->
blockDataLen
,
i
);
void
*
data
=
taosArrayGetP
(
pRsp
->
blockData
,
i
);
if
(
tEncodeBinary
(
pEncoder
,
(
const
uint8_t
*
)
data
,
bLen
)
<
0
)
return
-
1
;
if
(
pRsp
->
withSchema
)
{
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosArrayGetP
(
pRsp
->
blockSchema
,
i
);
if
(
tEncodeSSchemaWrapper
(
pEncoder
,
pSW
)
<
0
)
return
-
1
;
}
if
(
pRsp
->
withTbName
)
{
char
*
tbName
=
(
char
*
)
taosArrayGetP
(
pRsp
->
blockTbName
,
i
);
if
(
tEncodeCStr
(
pEncoder
,
tbName
)
<
0
)
return
-
1
;
}
}
}
if
(
tEncodeI32
(
pEncoder
,
pRsp
->
createTableNum
)
<
0
)
return
-
1
;
if
(
pRsp
->
createTableNum
)
{
for
(
int32_t
i
=
0
;
i
<
pRsp
->
createTableNum
;
i
++
)
{
void
*
createTableReq
=
taosArrayGetP
(
pRsp
->
createTableReq
,
i
);
int32_t
createTableLen
=
*
(
int32_t
*
)
taosArrayGet
(
pRsp
->
createTableLen
,
i
);
if
(
tEncodeBinary
(
pEncoder
,
createTableReq
,
createTableLen
)
<
0
)
return
-
1
;
}
}
return
0
;
}
int32_t
tDecodeSTaosxRsp
(
SDecoder
*
pDecoder
,
STaosxRsp
*
pRsp
)
{
if
(
tDecodeSTqOffsetVal
(
pDecoder
,
&
pRsp
->
reqOffset
)
<
0
)
return
-
1
;
if
(
tDecodeSTqOffsetVal
(
pDecoder
,
&
pRsp
->
rspOffset
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pRsp
->
blockNum
)
<
0
)
return
-
1
;
if
(
pRsp
->
blockNum
!=
0
)
{
pRsp
->
blockData
=
taosArrayInit
(
pRsp
->
blockNum
,
sizeof
(
void
*
));
pRsp
->
blockDataLen
=
taosArrayInit
(
pRsp
->
blockNum
,
sizeof
(
int32_t
));
if
(
tDecodeI8
(
pDecoder
,
&
pRsp
->
withTbName
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pRsp
->
withSchema
)
<
0
)
return
-
1
;
if
(
pRsp
->
withTbName
)
{
pRsp
->
blockTbName
=
taosArrayInit
(
pRsp
->
blockNum
,
sizeof
(
void
*
));
}
if
(
pRsp
->
withSchema
)
{
pRsp
->
blockSchema
=
taosArrayInit
(
pRsp
->
blockNum
,
sizeof
(
void
*
));
}
for
(
int32_t
i
=
0
;
i
<
pRsp
->
blockNum
;
i
++
)
{
void
*
data
;
uint64_t
bLen
;
if
(
tDecodeBinaryAlloc
(
pDecoder
,
&
data
,
&
bLen
)
<
0
)
return
-
1
;
taosArrayPush
(
pRsp
->
blockData
,
&
data
);
int32_t
len
=
bLen
;
taosArrayPush
(
pRsp
->
blockDataLen
,
&
len
);
if
(
pRsp
->
withSchema
)
{
SSchemaWrapper
*
pSW
=
(
SSchemaWrapper
*
)
taosMemoryCalloc
(
1
,
sizeof
(
SSchemaWrapper
));
if
(
pSW
==
NULL
)
return
-
1
;
if
(
tDecodeSSchemaWrapper
(
pDecoder
,
pSW
)
<
0
)
return
-
1
;
taosArrayPush
(
pRsp
->
blockSchema
,
&
pSW
);
}
if
(
pRsp
->
withTbName
)
{
char
*
tbName
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
tbName
)
<
0
)
return
-
1
;
taosArrayPush
(
pRsp
->
blockTbName
,
&
tbName
);
}
}
}
if
(
tDecodeI32
(
pDecoder
,
&
pRsp
->
createTableNum
)
<
0
)
return
-
1
;
if
(
pRsp
->
createTableNum
)
{
pRsp
->
createTableLen
=
taosArrayInit
(
pRsp
->
createTableNum
,
sizeof
(
int32_t
));
pRsp
->
createTableReq
=
taosArrayInit
(
pRsp
->
createTableNum
,
sizeof
(
void
*
));
for
(
int32_t
i
=
0
;
i
<
pRsp
->
createTableNum
;
i
++
)
{
void
*
pCreate
=
NULL
;
uint64_t
len
;
if
(
tDecodeBinaryAlloc
(
pDecoder
,
&
pCreate
,
&
len
)
<
0
)
return
-
1
;
int32_t
l
=
(
int32_t
)
len
;
taosArrayPush
(
pRsp
->
createTableLen
,
&
l
);
taosArrayPush
(
pRsp
->
createTableReq
,
&
pCreate
);
}
}
return
0
;
}
int32_t
tEncodeSSingleDeleteReq
(
SEncoder
*
pEncoder
,
const
SSingleDeleteReq
*
pReq
)
{
if
(
tEncodeI64
(
pEncoder
,
pReq
->
uid
)
<
0
)
return
-
1
;
if
(
tEncodeI64
(
pEncoder
,
pReq
->
ts
)
<
0
)
return
-
1
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
8b348976
...
...
@@ -287,6 +287,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
if
(
consumerVgNum
>
minVgCnt
)
{
if
(
imbCnt
<
imbConsumerNum
)
{
if
(
consumerVgNum
==
minVgCnt
+
1
)
{
imbCnt
++
;
continue
;
}
else
{
// pop until equal minVg + 1
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录