Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
9fed9752
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
9fed9752
编写于
4月 13, 2022
作者:
L
Liu Jicong
提交者:
GitHub
4月 13, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11467 from taosdata/feature/tq
fix: fix tmq result parse
上级
32b7c445
d4f71ebf
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
106 addition
and
64 deletion
+106
-64
example/src/tmq.c
example/src/tmq.c
+17
-6
include/common/tmsg.h
include/common/tmsg.h
+6
-9
source/client/src/clientMain.c
source/client/src/clientMain.c
+14
-10
source/client/src/tmq.c
source/client/src/tmq.c
+14
-10
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+30
-14
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+21
-11
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+4
-4
未找到文件。
example/src/tmq.c
浏览文件 @
9fed9752
...
...
@@ -19,8 +19,20 @@
#include <time.h>
#include "taos.h"
static
int
running
=
1
;
/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/
static
int
running
=
1
;
static
void
msg_process
(
TAOS_RES
*
msg
)
{
char
buf
[
1024
];
printf
(
"topic: %s
\n
"
,
tmq_get_topic_name
(
msg
));
printf
(
"vg:%d
\n
"
,
tmq_get_vgroup_id
(
msg
));
while
(
1
)
{
TAOS_ROW
row
=
taos_fetch_row
(
msg
);
if
(
row
==
NULL
)
break
;
TAOS_FIELD
*
fields
=
taos_fetch_fields
(
msg
);
int32_t
numOfFields
=
taos_field_count
(
msg
);
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
printf
(
"%s
\n
"
,
buf
);
}
}
int32_t
init_env
()
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
...
...
@@ -42,8 +54,7 @@ int32_t init_env() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(10)) tags(t1 int)"
);
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, c1 int, c2 float, c4 int) tags(t1 int)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table st1, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -90,7 +101,7 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1 from ct1"
);
pRes
=
taos_query
(
pConn
,
"create topic topic_ctb_column as select ts, c1
, c2, c4
from ct1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topic_ctb_column, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -200,7 +211,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1000
);
if
(
tmqmessage
)
{
/*msg_process(tmqmessage);*/
msg_process
(
tmqmessage
);
tmq_message_destroy
(
tmqmessage
);
if
((
++
msg_count
%
MIN_COMMIT_COUNT
)
==
0
)
tmq_commit
(
tmq
,
NULL
,
0
);
...
...
include/common/tmsg.h
浏览文件 @
9fed9752
...
...
@@ -2365,11 +2365,10 @@ typedef struct {
}
SMqSubVgEp
;
typedef
struct
{
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
int8_t
isSchemaAdaptive
;
SArray
*
vgs
;
// SArray<SMqSubVgEp>
int32_t
numOfFields
;
TAOS_FIELD
*
fields
;
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
int8_t
isSchemaAdaptive
;
SArray
*
vgs
;
// SArray<SMqSubVgEp>
SSchemaWrapper
schema
;
}
SMqSubTopicEp
;
typedef
struct
{
...
...
@@ -2468,8 +2467,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp
SMqSubVgEp
*
pVgEp
=
(
SMqSubVgEp
*
)
taosArrayGet
(
pTopicEp
->
vgs
,
i
);
tlen
+=
tEncodeSMqSubVgEp
(
buf
,
pVgEp
);
}
tlen
+=
taosEncodeFixedI32
(
buf
,
pTopicEp
->
numOfFields
);
// tlen += taosEncodeBinary(buf, pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD));
tlen
+=
taosEncodeSSchemaWrapper
(
buf
,
&
pTopicEp
->
schema
);
return
tlen
;
}
...
...
@@ -2487,8 +2485,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
buf
=
tDecodeSMqSubVgEp
(
buf
,
&
vgEp
);
taosArrayPush
(
pTopicEp
->
vgs
,
&
vgEp
);
}
buf
=
taosDecodeFixedI32
(
buf
,
&
pTopicEp
->
numOfFields
);
// buf = taosDecodeBinary(buf, (void**)&pTopicEp->fields, pTopicEp->numOfFields * sizeof(TAOS_FIELD));
buf
=
taosDecodeSSchemaWrapper
(
buf
,
&
pTopicEp
->
schema
);
return
buf
;
}
...
...
source/client/src/clientMain.c
浏览文件 @
9fed9752
...
...
@@ -171,21 +171,25 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return
doFetchRow
(
pRequest
,
true
,
true
);
}
else
if
(
TD_RES_TMQ
(
res
))
{
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
SMqRspObj
*
msg
=
((
SMqRspObj
*
)
res
);
if
(
msg
->
resIter
==
-
1
)
msg
->
resIter
++
;
SReqResultInfo
*
pResultInfo
=
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
if
(
pResultInfo
->
row
==
NULL
)
{
msg
->
resIter
++
;
pResultInfo
=
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
if
(
pResultInfo
->
current
<
pResultInfo
->
numOfRows
)
{
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
return
pResultInfo
->
row
;
}
else
{
msg
->
resIter
++
;
if
(
msg
->
resIter
<
taosArrayGetSize
(
msg
->
res
))
{
pResultInfo
=
taosArrayGet
(
msg
->
res
,
msg
->
resIter
);
doSetOneRowPtr
(
pResultInfo
);
pResultInfo
->
current
+=
1
;
return
pResultInfo
->
row
;
}
else
{
return
NULL
;
}
}
return
pResultInfo
->
row
;
}
else
{
// assert to avoid uninitialization error
ASSERT
(
0
);
...
...
source/client/src/tmq.c
浏览文件 @
9fed9752
...
...
@@ -119,14 +119,14 @@ typedef struct {
typedef
struct
{
// subscribe info
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
SArray
*
vgs
;
// SArray<SMqClientVg>
int8_t
isSchemaAdaptive
;
int32_t
numOfFields
;
TAOS_FIELD
*
fields
;
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
SArray
*
vgs
;
// SArray<SMqClientVg>
int8_t
isSchemaAdaptive
;
int32_t
numOfFields
;
SSchemaWrapper
schema
;
}
SMqClientTopic
;
typedef
struct
{
...
...
@@ -956,6 +956,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
for
(
int32_t
i
=
0
;
i
<
topicNumGet
;
i
++
)
{
SMqClientTopic
topic
=
{
0
};
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
pRsp
->
topics
,
i
);
topic
.
schema
=
pTopicEp
->
schema
;
taosHashClear
(
pHash
);
topic
.
topicName
=
strdup
(
pTopicEp
->
topic
);
...
...
@@ -1191,7 +1192,10 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
for
(
int32_t
i
=
0
;
i
<
blockNum
;
i
++
)
{
int32_t
pos
=
*
(
int32_t
*
)
taosArrayGet
(
pRsp
->
blockPos
,
i
);
SRetrieveTableRsp
*
pRetrieve
=
POINTER_SHIFT
(
pRsp
->
blockData
,
pos
);
SReqResultInfo
resInfo
;
SReqResultInfo
resInfo
=
{
0
};
resInfo
.
totalRows
=
0
;
resInfo
.
precision
=
TSDB_TIME_PRECISION_MILLI
;
setResSchemaInfo
(
&
resInfo
,
pWrapper
->
topicHandle
->
schema
.
pSchema
,
pWrapper
->
topicHandle
->
schema
.
nCols
);
setQueryResultFromRsp
(
&
resInfo
,
pRetrieve
,
true
);
taosArrayPush
(
pRspObj
->
res
,
&
resInfo
);
}
...
...
@@ -1386,7 +1390,7 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
rspWrapper
=
NULL
;
continue
;
}
// build
msg
// build
rsp
SMqRspObj
*
pRsp
=
tmqBuildRspFromWrapper
(
pollRspWrapper
);
return
pRsp
;
}
else
{
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
9fed9752
...
...
@@ -60,8 +60,10 @@ static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);
static
int32_t
mndPersistMqSetConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqTopicObj
*
pTopic
,
const
char
*
cgroup
,
const
SMqConsumerEp
*
pConsumerEp
);
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
);
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
);
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
);
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
);
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SUBSCRIBE
,
...
...
@@ -102,7 +104,8 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
return
pSub
;
}
static
int32_t
mndBuildRebalanceMsg
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
)
{
static
int32_t
mndBuildRebalanceMsg
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
)
{
SMqMVRebReq
req
=
{
.
vgId
=
pConsumerEp
->
vgId
,
.
oldConsumerId
=
pConsumerEp
->
oldConsumerId
,
...
...
@@ -131,7 +134,8 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
return
0
;
}
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
)
{
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
)
{
ASSERT
(
pConsumerEp
->
oldConsumerId
!=
-
1
);
void
*
buf
;
...
...
@@ -158,7 +162,8 @@ static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqC
return
0
;
}
static
int32_t
mndBuildCancelConnReq
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
)
{
static
int32_t
mndBuildCancelConnReq
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
)
{
SMqCancelConnReq
req
=
{
0
};
req
.
consumerId
=
pConsumerEp
->
consumerId
;
req
.
vgId
=
pConsumerEp
->
vgId
;
...
...
@@ -182,7 +187,8 @@ static int32_t mndBuildCancelConnReq(void **pBuf, int32_t *pLen, const SMqConsum
return
0
;
}
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
)
{
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
)
{
void
*
buf
;
int32_t
tlen
;
if
(
mndBuildCancelConnReq
(
&
buf
,
&
tlen
,
pConsumerEp
,
oldTopicName
)
<
0
)
{
...
...
@@ -219,13 +225,14 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
terrno
=
TSDB_CODE_MND_CONSUMER_NOT_EXIST
;
return
-
1
;
}
//TODO add lock
//
TODO add lock
ASSERT
(
strcmp
(
pReq
->
cgroup
,
pConsumer
->
cgroup
)
==
0
);
int32_t
serverEpoch
=
pConsumer
->
epoch
;
int32_t
serverEpoch
=
pConsumer
->
epoch
;
// TODO
int32_t
hbStatus
=
atomic_load_32
(
&
pConsumer
->
hbStatus
);
mDebug
(
"consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d"
,
consumerId
,
epoch
,
serverEpoch
,
hbStatus
);
mDebug
(
"consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d"
,
consumerId
,
epoch
,
serverEpoch
,
hbStatus
);
atomic_store_32
(
&
pConsumer
->
hbStatus
,
0
);
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
...
...
@@ -233,7 +240,8 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
strcpy
(
rsp
.
cgroup
,
pReq
->
cgroup
);
if
(
epoch
!=
serverEpoch
)
{
mInfo
(
"send new assignment to consumer %ld, consumer epoch %d, server epoch %d"
,
pConsumer
->
consumerId
,
epoch
,
serverEpoch
);
mInfo
(
"send new assignment to consumer %ld, consumer epoch %d, server epoch %d"
,
pConsumer
->
consumerId
,
epoch
,
serverEpoch
);
mDebug
(
"consumer %ld try r lock"
,
consumerId
);
taosRLockLatch
(
&
pConsumer
->
lock
);
mDebug
(
"consumer %ld r locked"
,
consumerId
);
...
...
@@ -251,8 +259,15 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
if
(
consumerId
==
pSubConsumer
->
consumerId
)
{
int32_t
vgsz
=
taosArrayGetSize
(
pSubConsumer
->
vgInfo
);
mInfo
(
"topic %s has %d vg"
,
topicName
,
serverEpoch
);
SMqSubTopicEp
topicEp
;
strcpy
(
topicEp
.
topic
,
topicName
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
topicName
);
ASSERT
(
pTopic
!=
NULL
);
topicEp
.
schema
=
pTopic
->
schema
;
mndReleaseTopic
(
pMnode
,
pTopic
);
topicEp
.
vgs
=
taosArrayInit
(
vgsz
,
sizeof
(
SMqSubVgEp
));
for
(
int32_t
k
=
0
;
k
<
vgsz
;
k
++
)
{
char
offsetKey
[
TSDB_PARTITION_KEY_LEN
];
...
...
@@ -409,7 +424,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribeByKey
(
pMnode
,
pRebSub
->
key
);
taosMemoryFreeClear
(
pRebSub
->
key
);
mInfo
(
"mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d"
,
pSub
->
key
,
pSub
->
vgNum
,
(
int32_t
)
taosArrayGetSize
(
pSub
->
unassignedVg
));
mInfo
(
"mq rebalance subscription: %s, vgNum: %d, unassignedVg: %d"
,
pSub
->
key
,
pSub
->
vgNum
,
(
int32_t
)
taosArrayGetSize
(
pSub
->
unassignedVg
));
// remove lost consumer
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
pRebSub
->
lostConsumers
);
i
++
)
{
...
...
@@ -459,12 +475,12 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
mDebug
(
"consumer %ld try w lock"
,
pRebConsumer
->
consumerId
);
taosWLockLatch
(
&
pRebConsumer
->
lock
);
mDebug
(
"consumer %ld w locked"
,
pRebConsumer
->
consumerId
);
int32_t
status
=
atomic_load_32
(
&
pRebConsumer
->
status
);
int32_t
status
=
atomic_load_32
(
&
pRebConsumer
->
status
);
if
(
vgThisConsumerAfterRb
!=
vgThisConsumerBeforeRb
||
(
vgThisConsumerAfterRb
!=
0
&&
status
!=
MQ_CONSUMER_STATUS__ACTIVE
)
||
(
vgThisConsumerAfterRb
==
0
&&
status
!=
MQ_CONSUMER_STATUS__LOST
))
{
/*if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb) {*/
/*pRebConsumer->epoch++;*/
/*pRebConsumer->epoch++;*/
/*}*/
if
(
vgThisConsumerAfterRb
!=
0
)
{
atomic_store_32
(
&
pRebConsumer
->
status
,
MQ_CONSUMER_STATUS__ACTIVE
);
...
...
@@ -500,7 +516,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
pConsumerEp
->
oldConsumerId
=
pConsumerEp
->
consumerId
;
pConsumerEp
->
consumerId
=
pSubConsumer
->
consumerId
;
//TODO
//
TODO
pConsumerEp
->
epoch
=
0
;
taosArrayPush
(
pSubConsumer
->
vgInfo
,
pConsumerEp
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
9fed9752
...
...
@@ -551,30 +551,40 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
rspV2
.
rspOffset
=
fetchOffset
;
int32_t
blockSz
=
taosArrayGetSize
(
pRes
);
int32_t
tl
en
=
0
;
int32_t
dataBlockStrL
en
=
0
;
for
(
int32_t
i
=
0
;
i
<
blockSz
;
i
++
)
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
pRes
,
i
);
tl
en
+=
sizeof
(
SRetrieveTableRsp
)
+
blockGetEncodeSize
(
pBlock
);
dataBlockStrL
en
+=
sizeof
(
SRetrieveTableRsp
)
+
blockGetEncodeSize
(
pBlock
);
}
void
*
data
=
taosMemoryMalloc
(
tl
en
);
if
(
data
==
NULL
)
{
void
*
data
BlockBuf
=
taosMemoryMalloc
(
dataBlockStrL
en
);
if
(
data
BlockBuf
==
NULL
)
{
pMsg
->
code
=
-
1
;
taosMemoryFree
(
pHead
);
}
rspV2
.
blockData
=
data
;
rspV2
.
blockData
=
data
BlockBuf
;
void
*
dataBlockBuf
=
data
;
int32_t
pos
;
rspV2
.
blockPos
=
taosArrayInit
(
blockSz
,
sizeof
(
int32_t
));
for
(
int32_t
i
=
0
;
i
<
blockSz
;
i
++
)
{
pos
=
0
;
SSDataBlock
*
pBlock
=
taosArrayGet
(
pRes
,
i
);
blockCompressEncode
(
pBlock
,
dataBlockBuf
,
&
pos
,
pBlock
->
info
.
numOfCols
,
false
);
SSDataBlock
*
pBlock
=
taosArrayGet
(
pRes
,
i
);
SRetrieveTableRsp
*
pRetrieve
=
(
SRetrieveTableRsp
*
)
dataBlockBuf
;
pRetrieve
->
useconds
=
0
;
pRetrieve
->
precision
=
0
;
pRetrieve
->
compressed
=
0
;
pRetrieve
->
completed
=
1
;
pRetrieve
->
numOfRows
=
htonl
(
pBlock
->
info
.
rows
);
blockCompressEncode
(
pBlock
,
pRetrieve
->
data
,
&
pos
,
pBlock
->
info
.
numOfCols
,
false
);
taosArrayPush
(
rspV2
.
blockPos
,
&
rspV2
.
dataLen
);
rspV2
.
dataLen
+=
pos
;
dataBlockBuf
=
POINTER_SHIFT
(
dataBlockBuf
,
pos
);
int32_t
totLen
=
sizeof
(
SRetrieveTableRsp
)
+
pos
;
pRetrieve
->
compLen
=
htonl
(
totLen
);
rspV2
.
dataLen
+=
totLen
;
dataBlockBuf
=
POINTER_SHIFT
(
dataBlockBuf
,
totLen
);
}
ASSERT
(
POINTER_DISTANCE
(
dataBlockBuf
,
rspV2
.
blockData
)
<=
dataBlockStrLen
);
int32_t
msgLen
=
sizeof
(
SMqRspHead
)
+
tEncodeSMqPollRspV2
(
NULL
,
&
rspV2
);
void
*
buf
=
rpcMallocCont
(
msgLen
);
...
...
@@ -590,7 +600,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tl
en
;
pMsg
->
contLen
=
msgL
en
;
pMsg
->
code
=
0
;
vDebug
(
"vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
,
consumerId
,
pReq
->
epoch
);
...
...
tests/script/jenkins/basic.txt
浏览文件 @
9fed9752
...
...
@@ -59,13 +59,13 @@
./test.sh -f tsim/tmq/oneTopic.sim
./test.sh -f tsim/tmq/multiTopic.sim
./test.sh -f tsim/tmq/mainConsumerInMultiTopic.sim
./test.sh -f tsim/tmq/mainConsumerInOneTopic.sim
#
./test.sh -f tsim/tmq/mainConsumerInMultiTopic.sim
#
./test.sh -f tsim/tmq/mainConsumerInOneTopic.sim
#fail ./test.sh -f tsim/tmq/main2Con1Cgrp1TopicFrCtb.sim
#fail ./test.sh -f tsim/tmq/main2Con1Cgrp1TopicFrStb.sim
./test.sh -f tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim
./test.sh -f tsim/tmq/main2Con1Cgrp2TopicFrStb.sim
#
./test.sh -f tsim/tmq/main2Con1Cgrp2TopicFrCtb.sim
#
./test.sh -f tsim/tmq/main2Con1Cgrp2TopicFrStb.sim
# --- stable
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录