Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a36cfba9
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
a36cfba9
编写于
3月 21, 2022
作者:
L
Liu Jicong
提交者:
GitHub
3月 21, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10879 from taosdata/feature/tq
Feature/tq
上级
8738ef69
8a20b2fe
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
126 addition
and
111 deletion
+126
-111
include/common/tcommon.h
include/common/tcommon.h
+31
-28
include/common/tmsg.h
include/common/tmsg.h
+24
-30
source/client/src/tmq.c
source/client/src/tmq.c
+45
-39
source/common/src/tmsg.c
source/common/src/tmsg.c
+4
-0
source/dnode/mgmt/vnode/src/vmWorker.c
source/dnode/mgmt/vnode/src/vmWorker.c
+3
-2
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+10
-7
source/dnode/snode/src/snode.c
source/dnode/snode/src/snode.c
+4
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+3
-2
source/dnode/vnode/src/vnd/vnodeInt.c
source/dnode/vnode/src/vnd/vnodeInt.c
+2
-2
未找到文件。
include/common/tcommon.h
浏览文件 @
a36cfba9
...
...
@@ -54,25 +54,28 @@ typedef struct SColumnDataAgg {
}
SColumnDataAgg
;
typedef
struct
SDataBlockInfo
{
STimeWindow
window
;
int32_t
rows
;
int32_t
rowSize
;
int16_t
numOfCols
;
int16_t
hasVarCol
;
union
{
int64_t
uid
;
int64_t
blockId
;};
STimeWindow
window
;
int32_t
rows
;
int32_t
rowSize
;
int16_t
numOfCols
;
int16_t
hasVarCol
;
union
{
int64_t
uid
;
int64_t
blockId
;
};
}
SDataBlockInfo
;
//typedef struct SConstantItem {
// SColumnInfo info;
// int32_t startRow; // run-length-encoding to save the space for multiple rows
// int32_t endRow;
// SVariant value;
//} SConstantItem;
//
typedef struct SConstantItem {
//
SColumnInfo info;
//
int32_t startRow; // run-length-encoding to save the space for multiple rows
//
int32_t endRow;
//
SVariant value;
//
} SConstantItem;
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef
struct
SSDataBlock
{
SColumnDataAgg
*
pBlockAgg
;
SArray
*
pDataBlock
;
// SArray<SColumnInfoData>
SColumnDataAgg
*
pBlockAgg
;
SArray
*
pDataBlock
;
// SArray<SColumnInfoData>
SDataBlockInfo
info
;
}
SSDataBlock
;
...
...
@@ -108,13 +111,13 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
static
FORCE_INLINE
int32_t
tEncodeSMqPollRsp
(
void
**
buf
,
const
SMqPollRsp
*
pRsp
)
{
int32_t
tlen
=
0
;
int32_t
sz
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
consumerId
);
//
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
reqOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
rspOffset
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
skipLogNum
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
numOfTopics
);
if
(
pRsp
->
numOfTopics
==
0
)
return
tlen
;
tlen
+=
tEncodeSSchemaWrapper
(
buf
,
pRsp
->
schema
s
);
tlen
+=
tEncodeSSchemaWrapper
(
buf
,
pRsp
->
schema
);
if
(
pRsp
->
pBlockData
)
{
sz
=
taosArrayGetSize
(
pRsp
->
pBlockData
);
}
...
...
@@ -128,15 +131,15 @@ static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp
static
FORCE_INLINE
void
*
tDecodeSMqPollRsp
(
void
*
buf
,
SMqPollRsp
*
pRsp
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
consumerId
);
//
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
reqOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
rspOffset
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
skipLogNum
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
numOfTopics
);
if
(
pRsp
->
numOfTopics
==
0
)
return
buf
;
pRsp
->
schema
s
=
(
SSchemaWrapper
*
)
calloc
(
1
,
sizeof
(
SSchemaWrapper
));
if
(
pRsp
->
schema
s
==
NULL
)
return
NULL
;
buf
=
tDecodeSSchemaWrapper
(
buf
,
pRsp
->
schema
s
);
pRsp
->
schema
=
(
SSchemaWrapper
*
)
calloc
(
1
,
sizeof
(
SSchemaWrapper
));
if
(
pRsp
->
schema
==
NULL
)
return
NULL
;
buf
=
tDecodeSSchemaWrapper
(
buf
,
pRsp
->
schema
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pRsp
->
pBlockData
=
taosArrayInit
(
sz
,
sizeof
(
SSDataBlock
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -148,11 +151,11 @@ static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) {
}
static
FORCE_INLINE
void
tDeleteSMqConsumeRsp
(
SMqPollRsp
*
pRsp
)
{
if
(
pRsp
->
schema
s
)
{
if
(
pRsp
->
schema
s
->
nCols
)
{
tfree
(
pRsp
->
schema
s
->
pSchema
);
if
(
pRsp
->
schema
)
{
if
(
pRsp
->
schema
->
nCols
)
{
tfree
(
pRsp
->
schema
->
pSchema
);
}
free
(
pRsp
->
schema
s
);
free
(
pRsp
->
schema
);
}
taosArrayDestroyEx
(
pRsp
->
pBlockData
,
(
void
(
*
)(
void
*
))
tDeleteSSDataBlock
);
pRsp
->
pBlockData
=
NULL
;
...
...
@@ -196,7 +199,7 @@ typedef struct SGroupbyExpr {
typedef
struct
SFunctParam
{
int32_t
type
;
SColumn
*
pCol
;
SColumn
*
pCol
;
SVariant
param
;
}
SFunctParam
;
...
...
@@ -214,12 +217,12 @@ typedef struct SResSchame {
typedef
struct
SExprBasicInfo
{
SResSchema
resSchema
;
int16_t
numOfParams
;
// argument value of each function
SFunctParam
*
pParam
;
SFunctParam
*
pParam
;
}
SExprBasicInfo
;
typedef
struct
SExprInfo
{
struct
SExprBasicInfo
base
;
struct
tExprNode
*
pExpr
;
struct
SExprBasicInfo
base
;
struct
tExprNode
*
pExpr
;
}
SExprInfo
;
typedef
struct
SStateWindow
{
...
...
include/common/tmsg.h
浏览文件 @
a36cfba9
...
...
@@ -1282,7 +1282,7 @@ static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
if
(
pRebSub
==
NULL
)
{
goto
_err
;
}
pRebSub
->
key
=
key
;
pRebSub
->
key
=
strdup
(
key
)
;
pRebSub
->
lostConsumers
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
if
(
pRebSub
->
lostConsumers
==
NULL
)
{
goto
_err
;
...
...
@@ -2116,25 +2116,16 @@ typedef struct {
int8_t
mqMsgType
;
int32_t
code
;
int32_t
epoch
;
int64_t
consumerId
;
}
SMqRspHead
;
typedef
struct
{
int64_t
consumerId
;
SSchemaWrapper
*
schemas
;
int64_t
reqOffset
;
int64_t
rspOffset
;
int32_t
skipLogNum
;
int32_t
numOfTopics
;
SArray
*
pBlockData
;
// SArray<SSDataBlock>
}
SMqPollRsp
;
// one req for one vg+topic
typedef
struct
{
SMsgHead
head
;
int64_t
consumerId
;
int64_t
blockingTime
;
int32_t
epoch
;
int8_t
withSchema
;
char
cgroup
[
TSDB_CGROUP_LEN
];
int64_t
currentOffset
;
...
...
@@ -2153,19 +2144,21 @@ typedef struct {
}
SMqSubTopicEp
;
typedef
struct
{
int64_t
consumerId
;
char
cgroup
[
TSDB_CGROUP_LEN
];
SArray
*
topics
;
// SArray<SMqSubTopicEp>
}
SMqCMGetSubEpRsp
;
SMqRspHead
head
;
// TODO: remove from msg
int64_t
reqOffset
;
int64_t
rspOffset
;
int32_t
skipLogNum
;
int32_t
numOfTopics
;
SSchemaWrapper
*
schema
;
SArray
*
pBlockData
;
// SArray<SSDataBlock>
}
SMqPollRsp
;
typedef
struct
{
SMqRspHead
head
;
union
{
SMqPollRsp
consumeRsp
;
SMqCMGetSubEpRsp
getEpRsp
;
};
void
*
extra
;
}
SMqMsgWrapper
;
char
cgroup
[
TSDB_CGROUP_LEN
];
SArray
*
topics
;
// SArray<SMqSubTopicEp>
}
SMqCMGetSubEpRsp
;
typedef
struct
{
int32_t
curBlock
;
...
...
@@ -2173,11 +2166,13 @@ typedef struct {
void
**
uData
;
}
SMqRowIter
;
struct
tmq_message_t_v1
{
SMqPollRsp
rsp
;
struct
tmq_message_t
{
SMqPollRsp
msg
;
void
*
vg
;
SMqRowIter
iter
;
};
#if 0
struct tmq_message_t {
SMqRspHead head;
union {
...
...
@@ -2189,6 +2184,7 @@ struct tmq_message_t {
int32_t curRow;
void** uData;
};
#endif
static
FORCE_INLINE
void
tDeleteSMqSubTopicEp
(
SMqSubTopicEp
*
pSubTopicEp
)
{
taosArrayDestroy
(
pSubTopicEp
->
vgs
);
}
...
...
@@ -2241,8 +2237,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
static
FORCE_INLINE
int32_t
tEncodeSMqCMGetSubEpRsp
(
void
**
buf
,
const
SMqCMGetSubEpRsp
*
pRsp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
consumerId
);
tlen
+=
taosEncodeString
(
buf
,
pRsp
->
cgroup
);
// tlen += taosEncodeString(buf, pRsp->cgroup);
int32_t
sz
=
taosArrayGetSize
(
pRsp
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
...
...
@@ -2253,8 +2248,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
}
static
FORCE_INLINE
void
*
tDecodeSMqCMGetSubEpRsp
(
void
*
buf
,
SMqCMGetSubEpRsp
*
pRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
consumerId
);
buf
=
taosDecodeStringTo
(
buf
,
pRsp
->
cgroup
);
// buf = taosDecodeStringTo(buf, pRsp->cgroup);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pRsp
->
topics
=
taosArrayInit
(
sz
,
sizeof
(
SMqSubTopicEp
));
...
...
@@ -2275,8 +2269,8 @@ enum {
};
typedef
struct
{
void
*
inputHandle
;
void
*
*
executor
;
void
*
inputHandle
;
void
*
executor
[
4
]
;
}
SStreamTaskParRunner
;
typedef
struct
{
...
...
source/client/src/tmq.c
浏览文件 @
a36cfba9
...
...
@@ -681,7 +681,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
int32_t
tmqGetSkipLogNum
(
tmq_message_t
*
tmq_message
)
{
if
(
tmq_message
==
NULL
)
return
0
;
SMqPollRsp
*
pRsp
=
&
tmq_message
->
consumeRsp
;
SMqPollRsp
*
pRsp
=
&
tmq_message
->
msg
;
return
pRsp
->
skipLogNum
;
}
...
...
@@ -690,15 +690,15 @@ void tmqShowMsg(tmq_message_t* tmq_message) {
static
bool
noPrintSchema
;
char
pBuf
[
128
];
SMqPollRsp
*
pRsp
=
&
tmq_message
->
consumeRsp
;
int32_t
colNum
=
pRsp
->
schema
s
->
nCols
;
SMqPollRsp
*
pRsp
=
&
tmq_message
->
msg
;
int32_t
colNum
=
pRsp
->
schema
->
nCols
;
if
(
!
noPrintSchema
)
{
printf
(
"|"
);
for
(
int32_t
i
=
0
;
i
<
colNum
;
i
++
)
{
if
(
i
==
0
)
printf
(
" %25s |"
,
pRsp
->
schema
s
->
pSchema
[
i
].
name
);
printf
(
" %25s |"
,
pRsp
->
schema
->
pSchema
[
i
].
name
);
else
printf
(
" %15s |"
,
pRsp
->
schema
s
->
pSchema
[
i
].
name
);
printf
(
" %15s |"
,
pRsp
->
schema
->
pSchema
[
i
].
name
);
}
printf
(
"
\n
"
);
printf
(
"===============================================
\n
"
);
...
...
@@ -778,19 +778,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
goto
WRITE_QUEUE_FAIL
;
}
memcpy
(
pRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqPollRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRsp
->
consumeRsp
);
pRsp
->
curBlock
=
0
;
pRsp
->
curRow
=
0
;
tDecodeSMqPollRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRsp
->
msg
);
pRsp
->
iter
.
curBlock
=
0
;
pRsp
->
iter
.
curRow
=
0
;
// TODO: alloc mem
/*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
if
(
pRsp
->
consumeRsp
.
numOfTopics
==
0
)
{
if
(
pRsp
->
msg
.
numOfTopics
==
0
)
{
/*printf("no data\n");*/
taosFreeQitem
(
pRsp
);
goto
WRITE_QUEUE_FAIL
;
}
pRsp
->
extra
=
pParam
->
pVg
;
pRsp
->
vg
=
pParam
->
pVg
;
taosWriteQitem
(
tmq
->
mqueue
,
pRsp
);
atomic_add_fetch_32
(
&
tmq
->
readyRequest
,
1
);
tsem_post
(
&
tmq
->
rspSem
);
...
...
@@ -860,14 +860,14 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
tDeleteSMqCMGetSubEpRsp
(
&
rsp
);
}
else
{
tmq_message_t
*
pRsp
=
taosAllocateQitem
(
sizeof
(
tmq_message_t
));
SMqCMGetSubEpRsp
*
pRsp
=
taosAllocateQitem
(
sizeof
(
SMqCMGetSubEpRsp
));
if
(
pRsp
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
code
=
-
1
;
goto
END
;
}
memcpy
(
pRsp
,
pMsg
->
pData
,
sizeof
(
SMqRspHead
));
tDecodeSMqCMGetSubEpRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
&
pRsp
->
getE
pRsp
);
tDecodeSMqCMGetSubEpRsp
(
POINTER_SHIFT
(
pMsg
->
pData
,
sizeof
(
SMqRspHead
)),
pRsp
);
taosWriteQitem
(
tmq
->
mqueue
,
pRsp
);
tsem_post
(
&
tmq
->
rspSem
);
...
...
@@ -983,6 +983,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
return
pReq
;
}
#if 0
tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
tmq_message_t* msg = NULL;
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
...
...
@@ -1050,6 +1051,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) {
}
return NULL;
}
#endif
int32_t
tmqPollImpl
(
tmq_t
*
tmq
,
int64_t
blockingTime
)
{
/*printf("call poll\n");*/
...
...
@@ -1111,11 +1113,12 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
}
// return
int32_t
tmqHandleRes
(
tmq_t
*
tmq
,
tmq_message_t
*
rspMsg
,
bool
*
pReset
)
{
if
(
rsp
Msg
->
head
.
mqMsgType
==
TMQ_MSG_TYPE__EP_RSP
)
{
int32_t
tmqHandleRes
(
tmq_t
*
tmq
,
SMqRspHead
*
rspHead
,
bool
*
pReset
)
{
if
(
rsp
Head
->
mqMsgType
==
TMQ_MSG_TYPE__EP_RSP
)
{
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
if
(
rspMsg
->
head
.
epoch
>
atomic_load_32
(
&
tmq
->
epoch
))
{
tmqUpdateEp
(
tmq
,
rspMsg
->
head
.
epoch
,
&
rspMsg
->
getEpRsp
);
if
(
rspHead
->
epoch
>
atomic_load_32
(
&
tmq
->
epoch
))
{
SMqCMGetSubEpRsp
*
rspMsg
=
(
SMqCMGetSubEpRsp
*
)
rspHead
;
tmqUpdateEp
(
tmq
,
rspHead
->
epoch
,
rspMsg
);
tmqClearUnhandleMsg
(
tmq
);
*
pReset
=
true
;
}
else
{
...
...
@@ -1129,21 +1132,22 @@ int32_t tmqHandleRes(tmq_t* tmq, tmq_message_t* rspMsg, bool* pReset) {
tmq_message_t
*
tmqHandleAllRsp
(
tmq_t
*
tmq
,
int64_t
blockingTime
,
bool
pollIfReset
)
{
while
(
1
)
{
tmq_message_t
*
rspMsg
=
NULL
;
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rsp
Msg
);
if
(
rsp
Msg
==
NULL
)
{
SMqRspHead
*
rspHead
=
NULL
;
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rsp
Head
);
if
(
rsp
Head
==
NULL
)
{
taosReadAllQitems
(
tmq
->
mqueue
,
tmq
->
qall
);
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rsp
Msg
);
if
(
rsp
Msg
==
NULL
)
return
NULL
;
taosGetQitem
(
tmq
->
qall
,
(
void
**
)
&
rsp
Head
);
if
(
rsp
Head
==
NULL
)
return
NULL
;
}
if
(
rspMsg
->
head
.
mqMsgType
==
TMQ_MSG_TYPE__POLL_RSP
)
{
if
(
rspHead
->
mqMsgType
==
TMQ_MSG_TYPE__POLL_RSP
)
{
tmq_message_t
*
rspMsg
=
(
tmq_message_t
*
)
rspHead
;
atomic_sub_fetch_32
(
&
tmq
->
readyRequest
,
1
);
/*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/
if
(
rspMsg
->
head
.
epoch
==
atomic_load_32
(
&
tmq
->
epoch
))
{
if
(
rspMsg
->
msg
.
head
.
epoch
==
atomic_load_32
(
&
tmq
->
epoch
))
{
/*printf("epoch match\n");*/
SMqClientVg
*
pVg
=
rspMsg
->
extra
;
pVg
->
currentOffset
=
rspMsg
->
consumeRsp
.
rspOffset
;
SMqClientVg
*
pVg
=
rspMsg
->
vg
;
pVg
->
currentOffset
=
rspMsg
->
msg
.
rspOffset
;
atomic_store_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
);
return
rspMsg
;
}
else
{
...
...
@@ -1153,8 +1157,8 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
}
else
{
/*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/
bool
reset
=
false
;
tmqHandleRes
(
tmq
,
rsp
Msg
,
&
reset
);
taosFreeQitem
(
rsp
Msg
);
tmqHandleRes
(
tmq
,
rsp
Head
,
&
reset
);
taosFreeQitem
(
rsp
Head
);
if
(
pollIfReset
&&
reset
)
{
printf
(
"reset and repoll
\n
"
);
tmqPollImpl
(
tmq
,
blockingTime
);
...
...
@@ -1163,6 +1167,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
}
}
#if 0
tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg = NULL;
int64_t startTime = taosGetTimestampMs();
...
...
@@ -1185,6 +1190,7 @@ tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
return NULL;
}
}
#endif
tmq_message_t
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
blocking_time
)
{
tmq_message_t
*
rspMsg
;
...
...
@@ -1350,7 +1356,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v
void
tmq_message_destroy
(
tmq_message_t
*
tmq_message
)
{
if
(
tmq_message
==
NULL
)
return
;
SMqPollRsp
*
pRsp
=
&
tmq_message
->
consumeRsp
;
SMqPollRsp
*
pRsp
=
&
tmq_message
->
msg
;
tDeleteSMqConsumeRsp
(
pRsp
);
/*free(tmq_message);*/
taosFreeQitem
(
tmq_message
);
...
...
@@ -1366,24 +1372,24 @@ const char* tmq_err2str(tmq_resp_err_t err) {
}
TAOS_ROW
tmq_get_row
(
tmq_message_t
*
message
)
{
SMqPollRsp
*
rsp
=
&
message
->
consumeRsp
;
SMqPollRsp
*
rsp
=
&
message
->
msg
;
while
(
1
)
{
if
(
message
->
curBlock
<
taosArrayGetSize
(
rsp
->
pBlockData
))
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
rsp
->
pBlockData
,
message
->
curBlock
);
if
(
message
->
curRow
<
pBlock
->
info
.
rows
)
{
if
(
message
->
iter
.
curBlock
<
taosArrayGetSize
(
rsp
->
pBlockData
))
{
SSDataBlock
*
pBlock
=
taosArrayGet
(
rsp
->
pBlockData
,
message
->
iter
.
curBlock
);
if
(
message
->
iter
.
curRow
<
pBlock
->
info
.
rows
)
{
for
(
int
i
=
0
;
i
<
pBlock
->
info
.
numOfCols
;
i
++
)
{
SColumnInfoData
*
pData
=
taosArrayGet
(
pBlock
->
pDataBlock
,
i
);
if
(
colDataIsNull_s
(
pData
,
message
->
curRow
))
message
->
uData
[
i
]
=
NULL
;
if
(
colDataIsNull_s
(
pData
,
message
->
iter
.
curRow
))
message
->
iter
.
uData
[
i
]
=
NULL
;
else
{
message
->
uData
[
i
]
=
colDataGetData
(
pData
,
message
->
curRow
);
message
->
iter
.
uData
[
i
]
=
colDataGetData
(
pData
,
message
->
iter
.
curRow
);
}
}
message
->
curRow
++
;
return
message
->
uData
;
message
->
iter
.
curRow
++
;
return
message
->
iter
.
uData
;
}
else
{
message
->
curBlock
++
;
message
->
curRow
=
0
;
message
->
iter
.
curBlock
++
;
message
->
iter
.
curRow
=
0
;
continue
;
}
}
...
...
source/common/src/tmsg.c
浏览文件 @
a36cfba9
...
...
@@ -2720,6 +2720,8 @@ int32_t tEncodeSStreamTask(SCoder *pEncoder, const SStreamTask *pTask) {
if
(
tEncodeI32
(
pEncoder
,
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tEncodeI32
(
pEncoder
,
pTask
->
level
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
pipeEnd
)
<
0
)
return
-
1
;
if
(
tEncodeI8
(
pEncoder
,
pTask
->
parallel
)
<
0
)
return
-
1
;
if
(
tEncodeSEpSet
(
pEncoder
,
&
pTask
->
NextOpEp
)
<
0
)
return
-
1
;
if
(
tEncodeCStr
(
pEncoder
,
pTask
->
qmsg
)
<
0
)
return
-
1
;
tEndEncode
(
pEncoder
);
...
...
@@ -2732,6 +2734,8 @@ int32_t tDecodeSStreamTask(SCoder *pDecoder, SStreamTask *pTask) {
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
taskId
)
<
0
)
return
-
1
;
if
(
tDecodeI32
(
pDecoder
,
&
pTask
->
level
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
status
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
pipeEnd
)
<
0
)
return
-
1
;
if
(
tDecodeI8
(
pDecoder
,
&
pTask
->
parallel
)
<
0
)
return
-
1
;
if
(
tDecodeSEpSet
(
pDecoder
,
&
pTask
->
NextOpEp
)
<
0
)
return
-
1
;
if
(
tDecodeCStrAlloc
(
pDecoder
,
&
pTask
->
qmsg
)
<
0
)
return
-
1
;
tEndDecode
(
pDecoder
);
...
...
source/dnode/mgmt/vnode/src/vmWorker.c
浏览文件 @
a36cfba9
...
...
@@ -77,8 +77,9 @@ static void vmProcessFetchQueue(SVnodeObj *pVnode, SNodeMsg *pMsg) {
}
dTrace
(
"msg:%p, is freed, result:0x%04x:%s"
,
pMsg
,
code
&
0XFFFF
,
tstrerror
(
code
));
rpcFreeCont
(
pMsg
->
rpcMsg
.
pCont
);
taosFreeQitem
(
pMsg
);
// TODO: handle invalid write
/*rpcFreeCont(pMsg->rpcMsg.pCont);*/
/*taosFreeQitem(pMsg);*/
}
static
void
vmProcessWriteQueue
(
SVnodeObj
*
pVnode
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
a36cfba9
...
...
@@ -272,7 +272,6 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
/*sdbWrite(pMnode->pSdb, pConsumerRaw);*/
strcpy
(
rsp
.
cgroup
,
pReq
->
cgroup
);
rsp
.
consumerId
=
consumerId
;
if
(
epoch
!=
pConsumer
->
epoch
)
{
mInfo
(
"send new assignment to consumer, consumer epoch %d, server epoch %d"
,
epoch
,
pConsumer
->
epoch
);
SArray
*
pTopics
=
pConsumer
->
currentTopics
;
...
...
@@ -322,6 +321,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
}
((
SMqRspHead
*
)
buf
)
->
mqMsgType
=
TMQ_MSG_TYPE__EP_RSP
;
((
SMqRspHead
*
)
buf
)
->
epoch
=
pConsumer
->
epoch
;
((
SMqRspHead
*
)
buf
)
->
consumerId
=
pConsumer
->
consumerId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
tEncodeSMqCMGetSubEpRsp
(
&
abuf
,
&
rsp
);
...
...
@@ -344,14 +344,14 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup)
}
static
SMqRebSubscribe
*
mndGetOrCreateRebSub
(
SHashObj
*
pHash
,
const
char
*
key
)
{
SMqRebSubscribe
*
pRebSub
=
taosHashGet
(
pHash
,
key
,
strlen
(
key
));
SMqRebSubscribe
*
pRebSub
=
taosHashGet
(
pHash
,
key
,
strlen
(
key
)
+
1
);
if
(
pRebSub
==
NULL
)
{
pRebSub
=
tNewSMqRebSubscribe
(
key
);
if
(
pRebSub
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
NULL
;
}
taosHashPut
(
pHash
,
key
,
strlen
(
key
),
pRebSub
,
sizeof
(
SMqRebSubscribe
));
taosHashPut
(
pHash
,
key
,
strlen
(
key
)
+
1
,
pRebSub
,
sizeof
(
SMqRebSubscribe
));
}
return
pRebSub
;
}
...
...
@@ -441,6 +441,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
if
(
pIter
==
NULL
)
break
;
SMqRebSubscribe
*
pRebSub
=
(
SMqRebSubscribe
*
)
pIter
;
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribeByKey
(
pMnode
,
pRebSub
->
key
);
tfree
(
pRebSub
->
key
);
mInfo
(
"mq rebalance subscription: %s"
,
pSub
->
key
);
...
...
@@ -503,7 +504,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
atomic_store_32
(
&
pRebConsumer
->
status
,
MQ_CONSUMER_STATUS__IDLE
);
}
mInfo
(
"mq consumer:%"
PRId64
", status change from %d to %d"
,
pRebConsumer
->
consumerId
,
status
,
pRebConsumer
->
status
);
mInfo
(
"mq consumer:%"
PRId64
", status change from %d to %d"
,
pRebConsumer
->
consumerId
,
status
,
pRebConsumer
->
status
);
SSdbRaw
*
pConsumerRaw
=
mndConsumerActionEncode
(
pRebConsumer
);
sdbSetRawStatus
(
pConsumerRaw
,
SDB_STATUS_READY
);
...
...
@@ -543,8 +545,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
mndPersistMqSetConnReq
(
pMnode
,
pTrans
,
pTopic
,
cgroup
,
pConsumerEp
);
mndReleaseTopic
(
pMnode
,
pTopic
);
}
else
{
mInfo
(
"mq rebalance: assign vgroup %d, from consumer %"
PRId64
" to consumer %"
PRId64
""
,
pConsumerEp
->
vgId
,
pConsumerEp
->
oldConsumerId
,
pConsumerEp
->
consumerId
);
mInfo
(
"mq rebalance: assign vgroup %d, from consumer %"
PRId64
" to consumer %"
PRId64
""
,
pConsumerEp
->
vgId
,
pConsumerEp
->
oldConsumerId
,
pConsumerEp
->
consumerId
);
mndPersistRebalanceMsg
(
pMnode
,
pTrans
,
pConsumerEp
);
}
...
...
@@ -1099,7 +1101,8 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
cgroup
,
newTopicName
);
bool
createSub
=
false
;
if
(
pSub
==
NULL
)
{
mDebug
(
"create new subscription by consumer %"
PRId64
", group: %s, topic %s"
,
consumerId
,
cgroup
,
newTopicName
);
mDebug
(
"create new subscription by consumer %"
PRId64
", group: %s, topic %s"
,
consumerId
,
cgroup
,
newTopicName
);
pSub
=
mndCreateSubscription
(
pMnode
,
pTopic
,
cgroup
);
createSub
=
true
;
...
...
source/dnode/snode/src/snode.c
浏览文件 @
a36cfba9
...
...
@@ -57,7 +57,9 @@ void sndMetaDelete(SStreamMeta *pMeta) {
}
int32_t
sndMetaDeployTask
(
SStreamMeta
*
pMeta
,
SStreamTask
*
pTask
)
{
pTask
->
runner
.
executor
=
qCreateStreamExecTaskInfo
(
pTask
->
qmsg
,
NULL
);
for
(
int
i
=
0
;
i
<
pTask
->
parallel
;
i
++
)
{
pTask
->
runner
.
executor
[
i
]
=
qCreateStreamExecTaskInfo
(
pTask
->
qmsg
,
NULL
);
}
return
taosHashPut
(
pMeta
->
pHash
,
&
pTask
->
taskId
,
sizeof
(
int32_t
),
pTask
,
sizeof
(
void
*
));
}
...
...
@@ -95,6 +97,7 @@ void sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTask
*
pTask
=
malloc
(
sizeof
(
SStreamTask
));
if
(
pTask
==
NULL
)
{
ASSERT
(
0
);
return
;
}
SCoder
decoder
;
tCoderInit
(
&
decoder
,
TD_LITTLE_ENDIAN
,
msg
,
pMsg
->
contLen
-
sizeof
(
SMsgHead
),
TD_DECODER
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
a36cfba9
...
...
@@ -245,7 +245,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
SMqPollRsp
rsp
=
{
.
consumerId
=
consumerId
,
/*.consumerId = consumerId,*/
.
numOfTopics
=
0
,
.
pBlockData
=
NULL
,
};
...
...
@@ -298,7 +298,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
taosArrayPush
(
pRes
,
pDataBlock
);
rsp
.
schema
s
=
pTopic
->
buffer
.
output
[
pos
].
pReadHandle
->
pSchemaWrapper
;
rsp
.
schema
=
pTopic
->
buffer
.
output
[
pos
].
pReadHandle
->
pSchemaWrapper
;
rsp
.
rspOffset
=
fetchOffset
;
rsp
.
numOfTopics
=
1
;
...
...
@@ -312,6 +312,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
}
((
SMqRspHead
*
)
buf
)
->
mqMsgType
=
TMQ_MSG_TYPE__POLL_RSP
;
((
SMqRspHead
*
)
buf
)
->
epoch
=
pReq
->
epoch
;
((
SMqRspHead
*
)
buf
)
->
consumerId
=
consumerId
;
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMqRspHead
));
tEncodeSMqPollRsp
(
&
abuf
,
&
rsp
);
...
...
source/dnode/vnode/src/vnd/vnodeInt.c
浏览文件 @
a36cfba9
...
...
@@ -14,8 +14,8 @@
*/
#define _DEFAULT_SOURCE
#include "vnd.h"
#include "sync.h"
#include "vnd.h"
// #include "vnodeInt.h"
int32_t
vnodeAlter
(
SVnode
*
pVnode
,
const
SVnodeCfg
*
pCfg
)
{
return
0
;
}
...
...
@@ -41,6 +41,6 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
}
int
vnodeProcessSyncReq
(
SVnode
*
pVnode
,
SRpcMsg
*
pMsg
,
SRpcMsg
**
pRsp
)
{
vInfo
(
"sync message is processed"
);
/*vInfo("sync message is processed");*/
return
0
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录