提交 d7d81d82 编写于 作者: wmmhello's avatar wmmhello

feat:add committed & position & commite_offset interface

上级 5cb35f2f
......@@ -1127,7 +1127,7 @@ TEST(clientCase, tmq_commit) {
printf("position vgId:%d, position:%lld\n", pAssign[i].vgId, position);
tmq_offset_seek(tmq, topicName, pAssign[i].vgId, 1);
position = tmq_position(tmq, topicName, pAssign[i].vgId);
printf("after seek 100, position vgId:%d, position:%lld\n", pAssign[i].vgId, position);
printf("after seek 1, position vgId:%d, position:%lld\n", pAssign[i].vgId, position);
}
while (1) {
......@@ -1143,6 +1143,12 @@ TEST(clientCase, tmq_commit) {
for(int i = 0; i < numOfAssign; i++) {
int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId);
printf("committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed);
if(committed > 0){
int32_t code = tmq_commit_offset_sync(tmq, topicName, pAssign[i].vgId, 4);
printf("tmq_commit_offset_sync vgId:%d, offset:4, code:%d\n", pAssign[i].vgId, code);
int64_t committed = tmq_committed(tmq, topicName, pAssign[i].vgId);
printf("after tmq_commit_offset_sync, committed vgId:%d, committed:%lld\n", pAssign[i].vgId, committed);
}
}
if (pRes != NULL) {
taos_free_result(pRes);
......
......@@ -85,9 +85,9 @@ void tqDestroyTqHandle(void* data) {
}
}
static bool tqOffsetLessOrEqual(const STqOffset* pLeft, const STqOffset* pRight) {
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
return pLeft->val.type == TMQ_OFFSET__LOG && pRight->val.type == TMQ_OFFSET__LOG &&
pLeft->val.version <= pRight->val.version;
pLeft->val.version == pRight->val.version;
}
STQ* tqOpen(const char* path, SVnode* pVnode) {
......@@ -302,10 +302,10 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
STqOffset* pOffset = &vgOffset.offset;
if (pOffset->val.type == TMQ_OFFSET__SNAPSHOT_DATA || pOffset->val.type == TMQ_OFFSET__SNAPSHOT_META) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:snapshot) uid:%" PRId64 ", ts:%" PRId64,
pOffset->subKey, vgId, pOffset->val.uid, pOffset->val.ts);
} else if (pOffset->val.type == TMQ_OFFSET__LOG) {
tqDebug("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
tqInfo("receive offset commit msg to %s on vgId:%d, offset(type:log) version:%" PRId64, pOffset->subKey, vgId,
pOffset->val.version);
if (pOffset->val.version + 1 == sversion) {
pOffset->val.version += 1;
......@@ -316,8 +316,8 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, int64_t sversion, char* msg, int32_t
}
STqOffset* pSavedOffset = tqOffsetRead(pTq->pOffsetStore, pOffset->subKey);
if (pSavedOffset != NULL && tqOffsetLessOrEqual(pOffset, pSavedOffset)) {
tqDebug("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
if (pSavedOffset != NULL && tqOffsetEqual(pOffset, pSavedOffset)) {
tqInfo("not update the offset, vgId:%d sub:%s since committed:%" PRId64 " less than/equal to existed:%" PRId64,
vgId, pOffset->subKey, pOffset->val.version, pSavedOffset->val.version);
return 0; // no need to update the offset value
}
......@@ -605,7 +605,7 @@ int32_t tqProcessVgCommittedInfoReq(STQ* pTq, SRpcMsg* pMsg) {
return TSDB_CODE_INVALID_PARA;
}
void* buf = taosMemoryCalloc(1, len);
void* buf = rpcMallocCont(len);
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册