未验证 提交 4063cfbc 编写于 作者: H Haojun Liao 提交者: GitHub

Merge pull request #21053 from taosdata/feature/3_liaohj

refactor: do some internal refactor and add the sample code.
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "taos.h" #include "taos.h"
static int running = 1; static int running = 1;
const char* topic_name = "topicname";
static int32_t msg_process(TAOS_RES* msg) { static int32_t msg_process(TAOS_RES* msg) {
char buf[1024]; char buf[1024];
...@@ -243,7 +244,7 @@ _end: ...@@ -243,7 +244,7 @@ _end:
tmq_list_t* build_topic_list() { tmq_list_t* build_topic_list() {
tmq_list_t* topicList = tmq_list_new(); tmq_list_t* topicList = tmq_list_new();
int32_t code = tmq_list_append(topicList, "topicname"); int32_t code = tmq_list_append(topicList, topic_name);
if (code) { if (code) {
tmq_list_destroy(topicList); tmq_list_destroy(topicList);
return NULL; return NULL;
...@@ -269,6 +270,31 @@ void basic_consume_loop(tmq_t* tmq) { ...@@ -269,6 +270,31 @@ void basic_consume_loop(tmq_t* tmq) {
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
} }
void consume_repeatly(tmq_t* tmq) {
int32_t numOfAssignment = 0;
tmq_topic_assignment* pAssign = NULL;
int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment);
if (code != 0) {
fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code));
}
// seek to the earliest offset
for(int32_t i = 0; i < numOfAssignment; ++i) {
tmq_topic_assignment* p = &pAssign[i];
code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin);
if (code != 0) {
fprintf(stderr, "failed to seek to %ld, reason:%s", p->begin, tmq_err2str(code));
}
}
free(pAssign);
// let's do it again
basic_consume_loop(tmq);
}
int main(int argc, char* argv[]) { int main(int argc, char* argv[]) {
int32_t code; int32_t code;
...@@ -294,10 +320,13 @@ int main(int argc, char* argv[]) { ...@@ -294,10 +320,13 @@ int main(int argc, char* argv[]) {
if ((code = tmq_subscribe(tmq, topic_list))) { if ((code = tmq_subscribe(tmq, topic_list))) {
fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code)); fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code));
} }
tmq_list_destroy(topic_list); tmq_list_destroy(topic_list);
basic_consume_loop(tmq); basic_consume_loop(tmq);
consume_repeatly(tmq);
code = tmq_consumer_close(tmq); code = tmq_consumer_close(tmq);
if (code) { if (code) {
fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code)); fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code));
......
...@@ -263,7 +263,7 @@ DLL_EXPORT const char *tmq_err2str(int32_t code); ...@@ -263,7 +263,7 @@ DLL_EXPORT const char *tmq_err2str(int32_t code);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */ /* ------------------------TMQ CONSUMER INTERFACE------------------------ */
typedef struct tmq_topic_assignment { typedef struct tmq_topic_assignment {
int32_t vgroupHandle; int32_t vgId;
int64_t currentOffset; int64_t currentOffset;
int64_t begin; int64_t begin;
int64_t end; int64_t end;
...@@ -277,7 +277,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); ...@@ -277,7 +277,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq);
DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg);
DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param);
DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char* pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char* pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment);
DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset); DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char* pTopicName, int32_t vgId, int64_t offset);
/* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */
......
...@@ -2357,7 +2357,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -2357,7 +2357,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) {
tmq_topic_assignment assignment = {.begin = pHead->walsver, tmq_topic_assignment assignment = {.begin = pHead->walsver,
.end = pHead->walever, .end = pHead->walever,
.currentOffset = rsp.rspOffset.version, .currentOffset = rsp.rspOffset.version,
.vgroupHandle = pParam->vgId}; .vgId = pParam->vgId};
taosThreadMutexLock(&pCommon->mutex); taosThreadMutexLock(&pCommon->mutex);
taosArrayPush(pCommon->pList, &assignment); taosArrayPush(pCommon->pList, &assignment);
...@@ -2422,7 +2422,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a ...@@ -2422,7 +2422,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->begin = pClientVg->offsetInfo.walVerBegin;
pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->end = pClientVg->offsetInfo.walVerEnd;
pAssignment->vgroupHandle = pClientVg->vgId; pAssignment->vgId = pClientVg->vgId;
} }
if (needFetch) { if (needFetch) {
...@@ -2524,7 +2524,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a ...@@ -2524,7 +2524,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
} }
} }
int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset) { int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) {
if (tmq == NULL) { if (tmq == NULL) {
tscError("invalid tmq handle, null"); tscError("invalid tmq handle, null");
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
...@@ -2544,14 +2544,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle ...@@ -2544,14 +2544,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle
int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); int32_t numOfVgs = taosArrayGetSize(pTopic->vgs);
for (int32_t i = 0; i < numOfVgs; ++i) { for (int32_t i = 0; i < numOfVgs; ++i) {
SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i);
if (pClientVg->vgId == vgroupHandle) { if (pClientVg->vgId == vgId) {
pVg = pClientVg; pVg = pClientVg;
break; break;
} }
} }
if (pVg == NULL) { if (pVg == NULL) {
tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgroupHandle); tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
......
...@@ -1144,7 +1144,7 @@ TEST(clientCase, sub_tb_test) { ...@@ -1144,7 +1144,7 @@ TEST(clientCase, sub_tb_test) {
taos_free_result(pRes); taos_free_result(pRes);
} }
tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgroupHandle, pAssign[0].begin); tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, pAssign[0].begin);
} }
tmq_consumer_close(tmq); tmq_consumer_close(tmq);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册