Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e812a659
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
e812a659
编写于
7月 18, 2023
作者:
wmmhello
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix:add tmq_position() interface & optimize commit logic
上级
6762a01d
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
302 addition
and
188 deletion
+302
-188
include/util/taoserror.h
include/util/taoserror.h
+1
-0
source/client/src/clientTmq.c
source/client/src/clientTmq.c
+300
-188
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/util/taoserror.h
浏览文件 @
e812a659
...
...
@@ -778,6 +778,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE TAOS_DEF_ERROR_CODE(0, 0x4007)
#define TSDB_CODE_TMQ_INVALID_VGID TAOS_DEF_ERROR_CODE(0, 0x4008)
#define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009)
#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
...
...
source/client/src/clientTmq.c
浏览文件 @
e812a659
...
...
@@ -139,8 +139,8 @@ enum {
typedef
struct
SVgOffsetInfo
{
STqOffsetVal
committedOffset
;
STqOffsetVal
currentOffset
;
STqOffsetVal
seekOffset
;
// the first version in block for seek operation
STqOffsetVal
endOffset
;
// the last version in TAOS_RES + 1
STqOffsetVal
beginOffset
;
// the first version in TAOS_RES
int64_t
walVerBegin
;
int64_t
walVerEnd
;
}
SVgOffsetInfo
;
...
...
@@ -255,8 +255,7 @@ typedef struct SSyncCommitInfo {
static
int32_t
doAskEp
(
tmq_t
*
tmq
);
static
int32_t
makeTopicVgroupKey
(
char
*
dst
,
const
char
*
topicName
,
int32_t
vg
);
static
int32_t
tmqCommitDone
(
SMqCommitCbParamSet
*
pParamSet
);
static
int32_t
doSendCommitMsg
(
tmq_t
*
tmq
,
SMqClientVg
*
pVg
,
const
char
*
pTopicName
,
SMqCommitCbParamSet
*
pParamSet
,
int32_t
index
,
int32_t
totalVgroups
,
int32_t
type
);
static
int32_t
doSendCommitMsg
(
tmq_t
*
tmq
,
int32_t
vgId
,
SEpSet
*
epSet
,
STqOffsetVal
*
offset
,
const
char
*
pTopicName
,
SMqCommitCbParamSet
*
pParamSet
);
static
void
commitRspCountDown
(
SMqCommitCbParamSet
*
pParamSet
,
int64_t
consumerId
,
const
char
*
pTopic
,
int32_t
vgId
);
static
void
asyncAskEp
(
tmq_t
*
pTmq
,
__tmq_askep_fn_t
askEpFn
,
void
*
param
);
static
void
addToQueueCallbackFn
(
tmq_t
*
pTmq
,
int32_t
code
,
SDataBuf
*
pDataBuf
,
void
*
param
);
...
...
@@ -429,69 +428,10 @@ char** tmq_list_to_c_array(const tmq_list_t* list) {
return
container
->
pData
;
}
//static SMqClientVg* foundClientVg(SArray* pTopicList, const char* pName, int32_t vgId, int32_t* index,
// int32_t* numOfVgroups) {
// int32_t numOfTopics = taosArrayGetSize(pTopicList);
// *index = -1;
// *numOfVgroups = 0;
//
// for (int32_t i = 0; i < numOfTopics; ++i) {
// SMqClientTopic* pTopic = taosArrayGet(pTopicList, i);
// if (strcmp(pTopic->topicName, pName) != 0) {
// continue;
// }
//
// *numOfVgroups = taosArrayGetSize(pTopic->vgs);
// for (int32_t j = 0; j < (*numOfVgroups); ++j) {
// SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, j);
// if (pClientVg->vgId == vgId) {
// *index = j;
// return pClientVg;
// }
// }
// }
//
// return NULL;
//}
// Two problems do not need to be addressed here
// 1. update to of epset. the response of poll request will automatically handle this problem
// 2. commit failure. This one needs to be resolved.
static
int32_t
tmqCommitCb
(
void
*
param
,
SDataBuf
*
pBuf
,
int32_t
code
)
{
SMqCommitCbParam
*
pParam
=
(
SMqCommitCbParam
*
)
param
;
SMqCommitCbParamSet
*
pParamSet
=
(
SMqCommitCbParamSet
*
)
pParam
->
params
;
// if (code != TSDB_CODE_SUCCESS) { // if commit offset failed, let's try again
// taosThreadMutexLock(&pParam->pTmq->lock);
// int32_t numOfVgroups, index;
// SMqClientVg* pVg = foundClientVg(pParam->pTmq->clientTopics, pParam->topicName, pParam->vgId, &index,
// &numOfVgroups); if (pVg == NULL) {
// tscDebug("consumer:0x%" PRIx64
// " subKey:%s vgId:%d commit failed, code:%s has been transferred to other consumer, no need retry
// ordinal:%d/%d", pParam->pTmq->consumerId, pParam->pOffset->subKey, pParam->vgId, tstrerror(code),
// index + 1, numOfVgroups);
// } else { // let's retry the commit
// int32_t code1 = doSendCommitMsg(pParam->pTmq, pVg, pParam->topicName, pParamSet, index, numOfVgroups);
// if (code1 != TSDB_CODE_SUCCESS) { // retry failed.
// tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64
// " retry failed, ignore this commit. code:%s ordinal:%d/%d",
// pParam->pTmq->consumerId, pParam->topicName, pVg->vgId, pVg->offsetInfo.committedOffset.version,
// tstrerror(terrno), index + 1, numOfVgroups);
// }
// }
//
// taosThreadMutexUnlock(&pParam->pTmq->lock);
//
// taosMemoryFree(pParam->pOffset);
// taosMemoryFree(pBuf->pData);
// taosMemoryFree(pBuf->pEpSet);
//
// commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
// return 0;
// }
//
// // todo replace the pTmq with refId
taosMemoryFree
(
pParam
->
pOffset
);
taosMemoryFree
(
pBuf
->
pData
);
taosMemoryFree
(
pBuf
->
pEpSet
);
...
...
@@ -500,15 +440,14 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
return
0
;
}
static
int32_t
doSendCommitMsg
(
tmq_t
*
tmq
,
SMqClientVg
*
pVg
,
const
char
*
pTopicName
,
SMqCommitCbParamSet
*
pParamSet
,
int32_t
index
,
int32_t
totalVgroups
,
int32_t
type
)
{
static
int32_t
doSendCommitMsg
(
tmq_t
*
tmq
,
int32_t
vgId
,
SEpSet
*
epSet
,
STqOffsetVal
*
offset
,
const
char
*
pTopicName
,
SMqCommitCbParamSet
*
pParamSet
)
{
SMqVgOffset
*
pOffset
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqVgOffset
));
if
(
pOffset
==
NULL
)
{
return
TSDB_CODE_OUT_OF_MEMORY
;
}
pOffset
->
consumerId
=
tmq
->
consumerId
;
pOffset
->
offset
.
val
=
pVg
->
offsetInfo
.
currentO
ffset
;
pOffset
->
offset
.
val
=
*
o
ffset
;
int32_t
groupLen
=
strlen
(
tmq
->
groupId
);
memcpy
(
pOffset
->
offset
.
subKey
,
tmq
->
groupId
,
groupLen
);
...
...
@@ -519,6 +458,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
int32_t
code
=
0
;
tEncodeSize
(
tEncodeMqVgOffset
,
pOffset
,
len
,
code
);
if
(
code
<
0
)
{
taosMemoryFree
(
pOffset
);
return
TSDB_CODE_INVALID_PARA
;
}
...
...
@@ -528,7 +468,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
return
TSDB_CODE_OUT_OF_MEMORY
;
}
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
pVg
->
vgId
);
((
SMsgHead
*
)
buf
)
->
vgId
=
htonl
(
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
...
...
@@ -547,7 +487,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
pParam
->
params
=
pParamSet
;
pParam
->
pOffset
=
pOffset
;
pParam
->
vgId
=
pVg
->
vgId
;
pParam
->
vgId
=
vgId
;
pParam
->
pTmq
=
tmq
;
tstrncpy
(
pParam
->
topicName
,
pTopicName
,
tListLen
(
pParam
->
topicName
));
...
...
@@ -568,23 +508,16 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
pMsgSendInfo
->
param
=
pParam
;
pMsgSendInfo
->
paramFreeFp
=
taosMemoryFree
;
pMsgSendInfo
->
fp
=
tmqCommitCb
;
pMsgSendInfo
->
msgType
=
type
;
pMsgSendInfo
->
msgType
=
TDMT_VND_TMQ_COMMIT_OFFSET
;
atomic_add_fetch_32
(
&
pParamSet
->
waitingRspNum
,
1
);
atomic_add_fetch_32
(
&
pParamSet
->
totalRspNum
,
1
);
SEp
*
pEp
=
GET_ACTIVE_EP
(
&
pVg
->
epSet
);
char
offsetBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
offsetBuf
,
tListLen
(
offsetBuf
),
&
pOffset
->
offset
.
val
);
SEp
*
pEp
=
GET_ACTIVE_EP
(
epSet
);
char
commitBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
commitBuf
,
tListLen
(
commitBuf
),
&
pVg
->
offsetInfo
.
committedOffset
);
tscInfo
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d send offset:%s prev:%s, ep:%s:%d, ordinal:%d/%d, req:0x%"
PRIx64
,
tmq
->
consumerId
,
pOffset
->
offset
.
subKey
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
,
pEp
->
fqdn
,
pEp
->
port
,
index
+
1
,
totalVgroups
,
pMsgSendInfo
->
requestId
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
pMsgSendInfo
);
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
epSet
,
&
transporterId
,
pMsgSendInfo
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -604,57 +537,28 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
return
NULL
;
}
static
void
asyncCommitOffset
(
tmq_t
*
tmq
,
const
TAOS_RES
*
pRes
,
int32_t
type
,
tmq_commit_cb
*
pCommitFp
,
void
*
userParam
)
{
char
*
pTopicName
=
NULL
;
int32_t
vgId
=
0
;
int32_t
code
=
0
;
if
(
pRes
==
NULL
||
tmq
==
NULL
)
{
pCommitFp
(
tmq
,
TSDB_CODE_INVALID_PARA
,
userParam
);
return
;
}
if
(
TD_RES_TMQ
(
pRes
))
{
SMqRspObj
*
pRspObj
=
(
SMqRspObj
*
)
pRes
;
pTopicName
=
pRspObj
->
topic
;
vgId
=
pRspObj
->
vgId
;
}
else
if
(
TD_RES_TMQ_META
(
pRes
))
{
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
pRes
;
pTopicName
=
pMetaRspObj
->
topic
;
vgId
=
pMetaRspObj
->
vgId
;
}
else
if
(
TD_RES_TMQ_METADATA
(
pRes
))
{
SMqTaosxRspObj
*
pRspObj
=
(
SMqTaosxRspObj
*
)
pRes
;
pTopicName
=
pRspObj
->
topic
;
vgId
=
pRspObj
->
vgId
;
}
else
{
pCommitFp
(
tmq
,
TSDB_CODE_TMQ_INVALID_MSG
,
userParam
);
return
;
}
static
SMqCommitCbParamSet
*
prepareCommitCbParamSet
(
tmq_t
*
tmq
,
tmq_commit_cb
*
pCommitFp
,
void
*
userParam
,
int32_t
rspNum
){
SMqCommitCbParamSet
*
pParamSet
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqCommitCbParamSet
));
if
(
pParamSet
==
NULL
)
{
pCommitFp
(
tmq
,
TSDB_CODE_OUT_OF_MEMORY
,
userParam
);
return
;
return
NULL
;
}
pParamSet
->
refId
=
tmq
->
refId
;
pParamSet
->
epoch
=
tmq
->
epoch
;
pParamSet
->
callbackFn
=
pCommitFp
;
pParamSet
->
userParam
=
userParam
;
pParamSet
->
waitingRspNum
=
rspNum
;
taosRLockLatch
(
&
tmq
->
lock
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
tscDebug
(
"consumer:0x%"
PRIx64
" do manual commit offset for %s, vgId:%d"
,
tmq
->
consumerId
,
pTopicName
,
vgId
);
return
pParamSet
;
}
static
SMqClientVg
*
getClientVg
(
tmq_t
*
tmq
,
char
*
pTopicName
,
int32_t
vgId
){
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
pTopicName
);
if
(
pTopic
==
NULL
)
{
tscWarn
(
"consumer:0x%"
PRIx64
" failed to find the specified topic:%s, total topics:%d"
,
tmq
->
consumerId
,
pTopicName
,
numOfTopics
);
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
TSDB_CODE_SUCCESS
,
userParam
);
taosRUnLockLatch
(
&
tmq
->
lock
);
return
;
tscWarn
(
"consumer:0x%"
PRIx64
" failed to find the specified topic:%s"
,
tmq
->
consumerId
,
pTopicName
);
return
NULL
;
}
int32_t
j
=
0
;
...
...
@@ -669,89 +573,150 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
if
(
j
==
numOfVgroups
)
{
tscWarn
(
"consumer:0x%"
PRIx64
" failed to find the specified vgId:%d, total Vgs:%d, topic:%s"
,
tmq
->
consumerId
,
vgId
,
numOfVgroups
,
pTopicName
);
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
TSDB_CODE_SUCCESS
,
userParam
);
taosRUnLockLatch
(
&
tmq
->
lock
);
return
;
return
NULL
;
}
SMqClientVg
*
pVg
=
(
SMqClientVg
*
)
taosArrayGet
(
pTopic
->
vgs
,
j
);
if
(
pVg
->
offsetInfo
.
currentOffset
.
type
>
0
&&
!
tOffsetEqual
(
&
pVg
->
offsetInfo
.
currentOffset
,
&
pVg
->
offsetInfo
.
committedOffset
))
{
code
=
doSendCommitMsg
(
tmq
,
pVg
,
pTopic
->
topicName
,
pParamSet
,
j
,
numOfVgroups
,
type
);
return
pVg
;
}
static
int32_t
asyncCommitOffset
(
tmq_t
*
tmq
,
char
*
pTopicName
,
int32_t
vgId
,
STqOffsetVal
*
offsetVal
,
tmq_commit_cb
*
pCommitFp
,
void
*
userParam
)
{
int32_t
code
=
0
;
tscInfo
(
"consumer:0x%"
PRIx64
" do manual commit offset for %s, vgId:%d"
,
tmq
->
consumerId
,
pTopicName
,
vgId
);
taosRLockLatch
(
&
tmq
->
lock
);
SMqClientVg
*
pVg
=
getClientVg
(
tmq
,
pTopicName
,
vgId
);
if
(
pVg
==
NULL
){
code
=
TSDB_CODE_TMQ_INVALID_VGID
;
goto
end
;
}
if
(
offsetVal
->
type
>
0
&&
!
tOffsetEqual
(
offsetVal
,
&
pVg
->
offsetInfo
.
committedOffset
))
{
char
offsetBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
offsetBuf
,
tListLen
(
offsetBuf
),
offsetVal
);
char
commitBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
commitBuf
,
tListLen
(
commitBuf
),
&
pVg
->
offsetInfo
.
committedOffset
);
// failed to commit, callback user function directly.
SMqCommitCbParamSet
*
pParamSet
=
prepareCommitCbParamSet
(
tmq
,
pCommitFp
,
userParam
,
0
);
if
(
pParamSet
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
code
=
doSendCommitMsg
(
tmq
,
pVg
->
vgId
,
&
pVg
->
epSet
,
&
pVg
->
offsetInfo
.
endOffset
,
pTopicName
,
pParamSet
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s"
,
tmq
->
consumerId
,
pTopicName
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
,
tstrerror
(
terrno
));
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
code
,
userParam
)
;
goto
end
;
}
// update the offset value.
pVg
->
offsetInfo
.
committedOffset
=
pVg
->
offsetInfo
.
currentOffset
;
}
else
{
// do not perform commit, callback user function directly.
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
code
,
userParam
);
tscInfo
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s"
,
tmq
->
consumerId
,
pTopicName
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
);
pVg
->
offsetInfo
.
committedOffset
=
*
offsetVal
;
}
end:
taosRUnLockLatch
(
&
tmq
->
lock
);
return
code
;
}
static
void
asyncCommitAllOffsets
(
tmq_t
*
tmq
,
tmq_commit_cb
*
pCommitFp
,
void
*
userParam
)
{
SMqCommitCbParamSet
*
pParamSet
=
taosMemoryCalloc
(
1
,
sizeof
(
SMqCommitCbParamSet
));
if
(
pParamSet
==
NULL
)
{
pCommitFp
(
tmq
,
TSDB_CODE_OUT_OF_MEMORY
,
userParam
);
return
;
static
void
asyncCommitFromResult
(
tmq_t
*
tmq
,
const
TAOS_RES
*
pRes
,
tmq_commit_cb
*
pCommitFp
,
void
*
userParam
){
char
*
pTopicName
=
NULL
;
int32_t
vgId
=
0
;
STqOffsetVal
offsetVal
=
{
0
};
int32_t
code
=
0
;
if
(
pRes
==
NULL
||
tmq
==
NULL
)
{
code
=
TSDB_CODE_INVALID_PARA
;
goto
end
;
}
pParamSet
->
refId
=
tmq
->
refId
;
pParamSet
->
epoch
=
tmq
->
epoch
;
pParamSet
->
callbackFn
=
pCommitFp
;
pParamSet
->
userParam
=
userParam
;
if
(
TD_RES_TMQ
(
pRes
))
{
SMqRspObj
*
pRspObj
=
(
SMqRspObj
*
)
pRes
;
pTopicName
=
pRspObj
->
topic
;
vgId
=
pRspObj
->
vgId
;
offsetVal
=
pRspObj
->
rsp
.
rspOffset
;
}
else
if
(
TD_RES_TMQ_META
(
pRes
))
{
SMqMetaRspObj
*
pMetaRspObj
=
(
SMqMetaRspObj
*
)
pRes
;
pTopicName
=
pMetaRspObj
->
topic
;
vgId
=
pMetaRspObj
->
vgId
;
offsetVal
=
pMetaRspObj
->
metaRsp
.
rspOffset
;
}
else
if
(
TD_RES_TMQ_METADATA
(
pRes
))
{
SMqTaosxRspObj
*
pRspObj
=
(
SMqTaosxRspObj
*
)
pRes
;
pTopicName
=
pRspObj
->
topic
;
vgId
=
pRspObj
->
vgId
;
offsetVal
=
pRspObj
->
rsp
.
rspOffset
;
}
else
{
code
=
TSDB_CODE_TMQ_INVALID_MSG
;
goto
end
;
}
code
=
asyncCommitOffset
(
tmq
,
pTopicName
,
vgId
,
&
offsetVal
,
pCommitFp
,
userParam
);
end:
if
(
code
!=
TSDB_CODE_SUCCESS
){
pCommitFp
(
tmq
,
code
,
userParam
);
}
}
static
void
asyncCommitAllOffsets
(
tmq_t
*
tmq
,
tmq_commit_cb
*
pCommitFp
,
void
*
userParam
)
{
int32_t
code
=
0
;
// init as 1 to prevent concurrency issue
pParamSet
->
waitingRspNum
=
1
;
SMqCommitCbParamSet
*
pParamSet
=
prepareCommitCbParamSet
(
tmq
,
pCommitFp
,
userParam
,
1
);
if
(
pParamSet
==
NULL
)
{
code
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
end
;
}
taosRLockLatch
(
&
tmq
->
lock
);
int32_t
numOfTopics
=
taosArrayGetSize
(
tmq
->
clientTopics
);
tsc
Debug
(
"consumer:0x%"
PRIx64
" start to commit offset for %d topics"
,
tmq
->
consumerId
,
numOfTopics
);
tsc
Info
(
"consumer:0x%"
PRIx64
" start to commit offset for %d topics"
,
tmq
->
consumerId
,
numOfTopics
);
for
(
int32_t
i
=
0
;
i
<
numOfTopics
;
i
++
)
{
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
i
);
int32_t
numOfVgroups
=
taosArrayGetSize
(
pTopic
->
vgs
);
tscDebug
(
"consumer:0x%"
PRIx64
" commit offset for topics:%s, numOfVgs:%d"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
numOfVgroups
);
tscInfo
(
"consumer:0x%"
PRIx64
" commit offset for topics:%s, numOfVgs:%d"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
numOfVgroups
);
for
(
int32_t
j
=
0
;
j
<
numOfVgroups
;
j
++
)
{
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
if
(
pVg
->
offsetInfo
.
currentOffset
.
type
>
0
&&
!
tOffsetEqual
(
&
pVg
->
offsetInfo
.
currentOffset
,
&
pVg
->
offsetInfo
.
committedOffset
))
{
int32_t
code
=
doSendCommitMsg
(
tmq
,
pVg
,
pTopic
->
topicName
,
pParamSet
,
j
,
numOfVgroups
,
TDMT_VND_TMQ_COMMIT_OFFSET
);
if
(
pVg
->
offsetInfo
.
endOffset
.
type
>
0
&&
!
tOffsetEqual
(
&
pVg
->
offsetInfo
.
endOffset
,
&
pVg
->
offsetInfo
.
committedOffset
))
{
char
offsetBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
offsetBuf
,
tListLen
(
offsetBuf
),
&
pVg
->
offsetInfo
.
endOffset
);
char
commitBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
commitBuf
,
tListLen
(
commitBuf
),
&
pVg
->
offsetInfo
.
committedOffset
);
code
=
doSendCommitMsg
(
tmq
,
pVg
->
vgId
,
&
pVg
->
epSet
,
&
pVg
->
offsetInfo
.
endOffset
,
pTopic
->
topicName
,
pParamSet
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"consumer:0x%"
PRIx64
" topic:%s vgId:%d offset:%"
PRId64
" failed, code:%s ordinal:%d/%d"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pVg
->
offsetInfo
.
committedOffset
.
version
,
tstrerror
(
terrno
),
j
+
1
,
numOfVgroups
);
tscError
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s ordinal:%d/%d"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
,
tstrerror
(
terrno
),
j
+
1
,
numOfVgroups
);
continue
;
}
// update the offset value.
pVg
->
offsetInfo
.
committedOffset
=
pVg
->
offsetInfo
.
currentOffset
;
tscInfo
(
"consumer:0x%"
PRIx64
" topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
offsetBuf
,
commitBuf
,
j
+
1
,
numOfVgroups
);
pVg
->
offsetInfo
.
committedOffset
=
pVg
->
offsetInfo
.
endOffset
;
}
else
{
tsc
Debug
(
"consumer:0x%"
PRIx64
" topic:%s vgId:%d, no commit, current:%"
PRId64
", ordinal:%d/%d"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pVg
->
offsetInfo
.
current
Offset
.
version
,
j
+
1
,
numOfVgroups
);
tsc
Info
(
"consumer:0x%"
PRIx64
" topic:%s vgId:%d, no commit, current:%"
PRId64
", ordinal:%d/%d"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pVg
->
offsetInfo
.
end
Offset
.
version
,
j
+
1
,
numOfVgroups
);
}
}
}
taosRUnLockLatch
(
&
tmq
->
lock
);
tscDebug
(
"consumer:0x%"
PRIx64
" total commit:%d for %d topics"
,
tmq
->
consumerId
,
pParamSet
->
waitingRspNum
-
1
,
numOfTopics
);
tscInfo
(
"consumer:0x%"
PRIx64
" total commit:%d for %d topics"
,
tmq
->
consumerId
,
pParamSet
->
waitingRspNum
-
1
,
numOfTopics
);
//
no
request is sent
if
(
pParamSet
->
totalRspNum
=
=
0
)
{
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
TSDB_CODE_SUCCESS
,
userParam
);
// request is sent
if
(
pParamSet
->
totalRspNum
!
=
0
)
{
// count down since waiting rsp num init as 1
commitRspCountDown
(
pParamSet
,
tmq
->
consumerId
,
""
,
0
);
return
;
}
// count down since waiting rsp num init as 1
commitRspCountDown
(
pParamSet
,
tmq
->
consumerId
,
""
,
0
);
end:
taosMemoryFree
(
pParamSet
);
pCommitFp
(
tmq
,
code
,
userParam
);
return
;
}
static
void
generateTimedTask
(
int64_t
refId
,
int32_t
type
)
{
...
...
@@ -827,7 +792,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
OffsetRows
*
offRows
=
taosArrayReserve
(
data
->
offsetRows
,
1
);
offRows
->
vgId
=
pVg
->
vgId
;
offRows
->
rows
=
pVg
->
numOfRows
;
offRows
->
offset
=
pVg
->
offsetInfo
.
seek
Offset
;
offRows
->
offset
=
pVg
->
offsetInfo
.
begin
Offset
;
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
offRows
->
offset
);
tscInfo
(
"consumer:0x%"
PRIx64
",report offset: vgId:%d, offset:%s, rows:%"
PRId64
,
tmq
->
consumerId
,
offRows
->
vgId
,
buf
,
offRows
->
rows
);
...
...
@@ -1523,9 +1488,9 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.
numOfRows
=
pInfo
?
pInfo
->
numOfRows
:
0
,
};
clientVg
.
offsetInfo
.
current
Offset
=
pInfo
?
pInfo
->
currentOffset
:
offsetNew
;
clientVg
.
offsetInfo
.
end
Offset
=
pInfo
?
pInfo
->
currentOffset
:
offsetNew
;
clientVg
.
offsetInfo
.
committedOffset
=
pInfo
?
pInfo
->
commitOffset
:
offsetNew
;
clientVg
.
offsetInfo
.
seek
Offset
=
pInfo
?
pInfo
->
seekOffset
:
offsetNew
;
clientVg
.
offsetInfo
.
begin
Offset
=
pInfo
?
pInfo
->
seekOffset
:
offsetNew
;
clientVg
.
offsetInfo
.
walVerBegin
=
-
1
;
clientVg
.
offsetInfo
.
walVerEnd
=
-
1
;
clientVg
.
seekUpdated
=
false
;
...
...
@@ -1581,11 +1546,11 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
makeTopicVgroupKey
(
vgKey
,
pTopicCur
->
topicName
,
pVgCur
->
vgId
);
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pVgCur
->
offsetInfo
.
current
Offset
);
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pVgCur
->
offsetInfo
.
end
Offset
);
tscInfo
(
"consumer:0x%"
PRIx64
", epoch:%d vgId:%d vgKey:%s, offset:%s"
,
tmq
->
consumerId
,
epoch
,
pVgCur
->
vgId
,
vgKey
,
buf
);
SVgroupSaveInfo
info
=
{.
currentOffset
=
pVgCur
->
offsetInfo
.
currentOffset
,
.
seekOffset
=
pVgCur
->
offsetInfo
.
seek
Offset
,
.
commitOffset
=
pVgCur
->
offsetInfo
.
committedOffset
,
.
numOfRows
=
pVgCur
->
numOfRows
};
SVgroupSaveInfo
info
=
{.
currentOffset
=
pVgCur
->
offsetInfo
.
endOffset
,
.
seekOffset
=
pVgCur
->
offsetInfo
.
begin
Offset
,
.
commitOffset
=
pVgCur
->
offsetInfo
.
committedOffset
,
.
numOfRows
=
pVgCur
->
numOfRows
};
taosHashPut
(
pVgOffsetHashMap
,
vgKey
,
strlen
(
vgKey
),
&
info
,
sizeof
(
SVgroupSaveInfo
));
}
}
...
...
@@ -1682,7 +1647,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
pReq
->
consumerId
=
tmq
->
consumerId
;
pReq
->
timeout
=
timeout
;
pReq
->
epoch
=
tmq
->
epoch
;
pReq
->
reqOffset
=
pVg
->
offsetInfo
.
current
Offset
;
pReq
->
reqOffset
=
pVg
->
offsetInfo
.
end
Offset
;
pReq
->
head
.
vgId
=
pVg
->
vgId
;
pReq
->
useSnapshot
=
tmq
->
useSnapshot
;
pReq
->
reqId
=
generateRequestId
();
...
...
@@ -1809,7 +1774,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
int64_t
transporterId
=
0
;
char
offsetFormatBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
offsetFormatBuf
,
tListLen
(
offsetFormatBuf
),
&
pVg
->
offsetInfo
.
current
Offset
);
tFormatOffset
(
offsetFormatBuf
,
tListLen
(
offsetFormatBuf
),
&
pVg
->
offsetInfo
.
end
Offset
);
tscDebug
(
"consumer:0x%"
PRIx64
" send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%"
PRIx64
,
pTmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
pTmq
->
epoch
,
offsetFormatBuf
,
req
.
reqId
);
...
...
@@ -1890,8 +1855,8 @@ static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* p
static
void
updateVgInfo
(
SMqClientVg
*
pVg
,
STqOffsetVal
*
reqOffset
,
STqOffsetVal
*
rspOffset
,
int64_t
sver
,
int64_t
ever
,
int64_t
consumerId
){
if
(
!
pVg
->
seekUpdated
)
{
tscDebug
(
"consumer:0x%"
PRIx64
" local offset is update, since seekupdate not set"
,
consumerId
);
pVg
->
offsetInfo
.
seek
Offset
=
*
reqOffset
;
pVg
->
offsetInfo
.
current
Offset
=
*
rspOffset
;
pVg
->
offsetInfo
.
begin
Offset
=
*
reqOffset
;
pVg
->
offsetInfo
.
end
Offset
=
*
rspOffset
;
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" local offset is NOT update, since seekupdate is set"
,
consumerId
);
}
...
...
@@ -2053,7 +2018,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmq
->
totalRows
+=
numOfRows
;
char
buf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pVg
->
offsetInfo
.
current
Offset
);
tFormatOffset
(
buf
,
TSDB_OFFSET_LEN
,
&
pVg
->
offsetInfo
.
end
Offset
);
tscDebug
(
"consumer:0x%"
PRIx64
" process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%"
PRId64
", vg total:%"
PRId64
", total:%"
PRId64
", reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pVg
->
vgId
,
buf
,
pollRspWrapper
->
dataRsp
.
blockNum
,
numOfRows
,
pVg
->
numOfRows
,
...
...
@@ -2315,7 +2280,7 @@ void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void*
if
(
pRes
==
NULL
)
{
// here needs to commit all offsets.
asyncCommitAllOffsets
(
tmq
,
cb
,
param
);
}
else
{
// only commit one offset
asyncCommit
Offset
(
tmq
,
pRes
,
TDMT_VND_TMQ_COMMIT_OFFSET
,
cb
,
param
);
asyncCommit
FromResult
(
tmq
,
pRes
,
cb
,
param
);
}
}
...
...
@@ -2335,7 +2300,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
if
(
pRes
==
NULL
)
{
asyncCommitAllOffsets
(
tmq
,
commitCallBackFn
,
pInfo
);
}
else
{
asyncCommit
Offset
(
tmq
,
pRes
,
TDMT_VND_TMQ_COMMIT_OFFSET
,
commitCallBackFn
,
pInfo
);
asyncCommit
FromResult
(
tmq
,
pRes
,
commitCallBackFn
,
pInfo
);
}
tsem_wait
(
&
pInfo
->
sem
);
...
...
@@ -2348,6 +2313,87 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
return
code
;
}
// wal range will be ok after calling tmq_get_topic_assignment or poll interface
static
bool
isWalRangeOk
(
SVgOffsetInfo
*
offset
){
if
(
offset
->
walVerBegin
!=
-
1
&&
offset
->
walVerEnd
!=
-
1
)
{
return
true
;
}
return
false
;
}
int32_t
tmq_commit_offset_sync
(
tmq_t
*
tmq
,
const
char
*
pTopicName
,
int32_t
vgId
,
int64_t
offset
){
if
(
tmq
==
NULL
||
pTopicName
==
NULL
)
{
tscError
(
"invalid tmq handle, null"
);
return
TSDB_CODE_INVALID_PARA
;
}
int32_t
accId
=
tmq
->
pTscObj
->
acctId
;
char
tname
[
TSDB_TOPIC_FNAME_LEN
]
=
{
0
};
sprintf
(
tname
,
"%d.%s"
,
accId
,
pTopicName
);
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
tname
);
if
(
pTopic
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" invalid topic name:%s"
,
tmq
->
consumerId
,
pTopicName
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_TMQ_INVALID_TOPIC
;
}
SMqClientVg
*
pVg
=
NULL
;
int32_t
numOfVgs
=
taosArrayGetSize
(
pTopic
->
vgs
);
for
(
int32_t
i
=
0
;
i
<
numOfVgs
;
++
i
)
{
SMqClientVg
*
pClientVg
=
taosArrayGet
(
pTopic
->
vgs
,
i
);
if
(
pClientVg
->
vgId
==
vgId
)
{
pVg
=
pClientVg
;
break
;
}
}
if
(
pVg
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" invalid vgroup id:%d"
,
tmq
->
consumerId
,
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_TMQ_INVALID_VGID
;
}
SVgOffsetInfo
*
pOffsetInfo
=
&
pVg
->
offsetInfo
;
if
(
!
isWalRangeOk
(
pOffsetInfo
))
{
tscError
(
"consumer:0x%"
PRIx64
" Assignment or poll interface need to be called first"
,
tmq
->
consumerId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_TMQ_NEED_INITIALIZED
;
}
if
(
offset
<
pOffsetInfo
->
walVerBegin
||
offset
>
pOffsetInfo
->
walVerEnd
)
{
tscError
(
"consumer:0x%"
PRIx64
" invalid seek params, offset:%"
PRId64
", valid range:[%"
PRId64
", %"
PRId64
"]"
,
tmq
->
consumerId
,
offset
,
pOffsetInfo
->
walVerBegin
,
pOffsetInfo
->
walVerEnd
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE
;
}
taosWUnLockLatch
(
&
tmq
->
lock
);
STqOffsetVal
offsetVal
=
{.
type
=
TMQ_OFFSET__LOG
,
.
version
=
offset
};
SSyncCommitInfo
*
pInfo
=
taosMemoryMalloc
(
sizeof
(
SSyncCommitInfo
));
if
(
pInfo
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" failed to prepare seek operation"
,
tmq
->
consumerId
);
return
TSDB_CODE_OUT_OF_MEMORY
;
}
tsem_init
(
&
pInfo
->
sem
,
0
,
0
);
pInfo
->
code
=
0
;
asyncCommitOffset
(
tmq
,
tname
,
vgId
,
&
offsetVal
,
commitCallBackFn
,
pInfo
);
tsem_wait
(
&
pInfo
->
sem
);
int32_t
code
=
pInfo
->
code
;
tsem_destroy
(
&
pInfo
->
sem
);
taosMemoryFree
(
pInfo
);
tscInfo
(
"consumer:0x%"
PRIx64
" send seek to vgId:%d, offset:%"
PRId64
" code:%s"
,
tmq
->
consumerId
,
vgId
,
offset
,
tstrerror
(
code
));
return
code
;
}
void
updateEpCallbackFn
(
tmq_t
*
pTmq
,
int32_t
code
,
SDataBuf
*
pDataBuf
,
void
*
param
)
{
SAskEpInfo
*
pInfo
=
param
;
pInfo
->
code
=
code
;
...
...
@@ -2490,12 +2536,10 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
void
commitRspCountDown
(
SMqCommitCbParamSet
*
pParamSet
,
int64_t
consumerId
,
const
char
*
pTopic
,
int32_t
vgId
)
{
int32_t
waitingRspNum
=
atomic_sub_fetch_32
(
&
pParamSet
->
waitingRspNum
,
1
);
if
(
waitingRspNum
==
0
)
{
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%s vgId:%d all commit-rsp received, commit completed"
,
consumerId
,
pTopic
,
vgId
);
tscInfo
(
"consumer:0x%"
PRIx64
" topic:%s vgId:%d all commit-rsp received, commit completed"
,
consumerId
,
pTopic
,
vgId
);
tmqCommitDone
(
pParamSet
);
}
else
{
tscDebug
(
"consumer:0x%"
PRIx64
" topic:%s vgId:%d commit-rsp received, remain:%d"
,
consumerId
,
pTopic
,
vgId
,
waitingRspNum
);
tscInfo
(
"consumer:0x%"
PRIx64
" topic:%s vgId:%d commit-rsp received, remain:%d"
,
consumerId
,
pTopic
,
vgId
,
waitingRspNum
);
}
}
...
...
@@ -2578,6 +2622,69 @@ static bool isInSnapshotMode(int8_t type, bool useSnapshot){
return
false
;
}
int64_t
tmq_position
(
tmq_t
*
tmq
,
const
char
*
pTopicName
,
int32_t
vgId
){
if
(
tmq
==
NULL
)
{
tscError
(
"invalid tmq handle, null"
);
return
TSDB_CODE_INVALID_PARA
;
}
int32_t
accId
=
tmq
->
pTscObj
->
acctId
;
char
tname
[
TSDB_TOPIC_FNAME_LEN
]
=
{
0
};
sprintf
(
tname
,
"%d.%s"
,
accId
,
pTopicName
);
taosWLockLatch
(
&
tmq
->
lock
);
SMqClientTopic
*
pTopic
=
getTopicByName
(
tmq
,
tname
);
if
(
pTopic
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" invalid topic name:%s"
,
tmq
->
consumerId
,
pTopicName
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_TMQ_INVALID_TOPIC
;
}
SMqClientVg
*
pVg
=
NULL
;
int32_t
numOfVgs
=
taosArrayGetSize
(
pTopic
->
vgs
);
for
(
int32_t
i
=
0
;
i
<
numOfVgs
;
++
i
)
{
SMqClientVg
*
pClientVg
=
taosArrayGet
(
pTopic
->
vgs
,
i
);
if
(
pClientVg
->
vgId
==
vgId
)
{
pVg
=
pClientVg
;
break
;
}
}
if
(
pVg
==
NULL
)
{
tscError
(
"consumer:0x%"
PRIx64
" invalid vgroup id:%d"
,
tmq
->
consumerId
,
vgId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_TMQ_INVALID_VGID
;
}
int32_t
type
=
pVg
->
offsetInfo
.
endOffset
.
type
;
if
(
isInSnapshotMode
(
type
,
tmq
->
useSnapshot
))
{
tscError
(
"consumer:0x%"
PRIx64
" offset type:%d not wal version, position error"
,
tmq
->
consumerId
,
type
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_TMQ_SNAPSHOT_ERROR
;
}
if
(
!
isWalRangeOk
(
&
pVg
->
offsetInfo
))
{
tscError
(
"consumer:0x%"
PRIx64
" Assignment or poll interface need to be called first"
,
tmq
->
consumerId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_TMQ_NEED_INITIALIZED
;
}
int64_t
position
=
0
;
STqOffsetVal
*
pOffsetInfo
=
&
pVg
->
offsetInfo
.
endOffset
;
if
(
type
==
TMQ_OFFSET__LOG
){
position
=
pOffsetInfo
->
version
;
}
else
if
(
type
==
TMQ_OFFSET__RESET_EARLIEST
){
position
=
pVg
->
offsetInfo
.
walVerBegin
;
}
else
if
(
type
==
TMQ_OFFSET__RESET_LATEST
){
position
=
pVg
->
offsetInfo
.
walVerEnd
;
}
else
{
tscError
(
"consumer:0x%"
PRIx64
" offset type:%d can not be reach here"
,
tmq
->
consumerId
,
type
);
}
taosWUnLockLatch
(
&
tmq
->
lock
);
return
position
;
}
int32_t
tmq_get_topic_assignment
(
tmq_t
*
tmq
,
const
char
*
pTopicName
,
tmq_topic_assignment
**
assignment
,
int32_t
*
numOfAssignment
)
{
*
numOfAssignment
=
0
;
...
...
@@ -2585,7 +2692,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SMqVgCommon
*
pCommon
=
NULL
;
int32_t
accId
=
tmq
->
pTscObj
->
acctId
;
char
tname
[
128
]
=
{
0
};
char
tname
[
TSDB_TOPIC_FNAME_LEN
]
=
{
0
};
sprintf
(
tname
,
"%d.%s"
,
accId
,
pTopicName
);
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -2600,7 +2707,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
*
numOfAssignment
=
taosArrayGetSize
(
pTopic
->
vgs
);
for
(
int32_t
j
=
0
;
j
<
(
*
numOfAssignment
);
++
j
)
{
SMqClientVg
*
pClientVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
int32_t
type
=
pClientVg
->
offsetInfo
.
seek
Offset
.
type
;
int32_t
type
=
pClientVg
->
offsetInfo
.
begin
Offset
.
type
;
if
(
isInSnapshotMode
(
type
,
tmq
->
useSnapshot
))
{
tscError
(
"consumer:0x%"
PRIx64
" offset type:%d not wal version, assignment not allowed"
,
tmq
->
consumerId
,
type
);
code
=
TSDB_CODE_TMQ_SNAPSHOT_ERROR
;
...
...
@@ -2620,13 +2727,13 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
for
(
int32_t
j
=
0
;
j
<
(
*
numOfAssignment
);
++
j
)
{
SMqClientVg
*
pClientVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
if
(
pClientVg
->
offsetInfo
.
seek
Offset
.
type
!=
TMQ_OFFSET__LOG
)
{
if
(
pClientVg
->
offsetInfo
.
begin
Offset
.
type
!=
TMQ_OFFSET__LOG
)
{
needFetch
=
true
;
break
;
}
tmq_topic_assignment
*
pAssignment
=
&
(
*
assignment
)[
j
];
pAssignment
->
currentOffset
=
pClientVg
->
offsetInfo
.
seek
Offset
.
version
;
pAssignment
->
currentOffset
=
pClientVg
->
offsetInfo
.
begin
Offset
.
version
;
pAssignment
->
begin
=
pClientVg
->
offsetInfo
.
walVerBegin
;
pAssignment
->
end
=
pClientVg
->
offsetInfo
.
walVerEnd
;
pAssignment
->
vgId
=
pClientVg
->
vgId
;
...
...
@@ -2665,7 +2772,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
SMqPollReq
req
=
{
0
};
tmqBuildConsumeReqImpl
(
&
req
,
tmq
,
10
,
pTopic
,
pClientVg
);
req
.
reqOffset
=
pClientVg
->
offsetInfo
.
seek
Offset
;
req
.
reqOffset
=
pClientVg
->
offsetInfo
.
begin
Offset
;
int32_t
msgSize
=
tSerializeSMqPollReq
(
NULL
,
0
,
&
req
);
if
(
msgSize
<
0
)
{
...
...
@@ -2705,7 +2812,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
int64_t
transporterId
=
0
;
char
offsetFormatBuf
[
TSDB_OFFSET_LEN
]
=
{
0
};
tFormatOffset
(
offsetFormatBuf
,
tListLen
(
offsetFormatBuf
),
&
pClientVg
->
offsetInfo
.
seek
Offset
);
tFormatOffset
(
offsetFormatBuf
,
tListLen
(
offsetFormatBuf
),
&
pClientVg
->
offsetInfo
.
begin
Offset
);
tscInfo
(
"consumer:0x%"
PRIx64
" %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%"
PRIx64
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pClientVg
->
vgId
,
tmq
->
epoch
,
offsetFormatBuf
,
req
.
reqId
);
...
...
@@ -2780,7 +2887,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
}
int32_t
accId
=
tmq
->
pTscObj
->
acctId
;
char
tname
[
128
]
=
{
0
};
char
tname
[
TSDB_TOPIC_FNAME_LEN
]
=
{
0
};
sprintf
(
tname
,
"%d.%s"
,
accId
,
pTopicName
);
taosWLockLatch
(
&
tmq
->
lock
);
...
...
@@ -2809,14 +2916,20 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
SVgOffsetInfo
*
pOffsetInfo
=
&
pVg
->
offsetInfo
;
int32_t
type
=
pOffsetInfo
->
current
Offset
.
type
;
int32_t
type
=
pOffsetInfo
->
end
Offset
.
type
;
if
(
isInSnapshotMode
(
type
,
tmq
->
useSnapshot
))
{
tscError
(
"consumer:0x%"
PRIx64
" offset type:%d not wal version, seek not allowed"
,
tmq
->
consumerId
,
type
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_TMQ_SNAPSHOT_ERROR
;
}
if
(
type
==
TMQ_OFFSET__LOG
&&
(
offset
<
pOffsetInfo
->
walVerBegin
||
offset
>
pOffsetInfo
->
walVerEnd
))
{
if
(
!
isWalRangeOk
(
&
pVg
->
offsetInfo
))
{
tscError
(
"consumer:0x%"
PRIx64
" Assignment or poll interface need to be called first"
,
tmq
->
consumerId
);
taosWUnLockLatch
(
&
tmq
->
lock
);
return
TSDB_CODE_TMQ_NEED_INITIALIZED
;
}
if
(
offset
<
pOffsetInfo
->
walVerBegin
||
offset
>
pOffsetInfo
->
walVerEnd
)
{
tscError
(
"consumer:0x%"
PRIx64
" invalid seek params, offset:%"
PRId64
", valid range:[%"
PRId64
", %"
PRId64
"]"
,
tmq
->
consumerId
,
offset
,
pOffsetInfo
->
walVerBegin
,
pOffsetInfo
->
walVerEnd
);
taosWUnLockLatch
(
&
tmq
->
lock
);
...
...
@@ -2824,10 +2937,9 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
}
// update the offset, and then commit to vnode
pOffsetInfo
->
currentOffset
.
type
=
TMQ_OFFSET__LOG
;
pOffsetInfo
->
currentOffset
.
version
=
offset
;
pOffsetInfo
->
seekOffset
=
pOffsetInfo
->
currentOffset
;
// pOffsetInfo->committedOffset.version = INT64_MIN;
pOffsetInfo
->
endOffset
.
type
=
TMQ_OFFSET__LOG
;
pOffsetInfo
->
endOffset
.
version
=
offset
;
pOffsetInfo
->
beginOffset
=
pOffsetInfo
->
endOffset
;
pVg
->
seekUpdated
=
true
;
tscInfo
(
"consumer:0x%"
PRIx64
" seek to %"
PRId64
" on vgId:%d"
,
tmq
->
consumerId
,
offset
,
vgId
);
...
...
source/util/src/terror.c
浏览文件 @
e812a659
...
...
@@ -631,6 +631,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SCALAR_CONVERT_ERROR, "Cannot convert to s
//tmq
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_INVALID_MSG
,
"Invalid message"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_NEED_INITIALIZED
,
"Assignment or poll interface need to be called first"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_SNAPSHOT_ERROR
,
"Can not operate in snapshot mode"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE
,
"Offset out of range"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_INVALID_VGID
,
"VgId does not belong to this consumer"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录