diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 83b40c4f20a332b94abe45dd5c4fb7afd820861a..94545dfaad943a3ec5e35ae6859213835d8c4540 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -20,7 +20,8 @@ #include #include "taos.h" -static int running = 1; +static int running = 1; +const char* topic_name = "topicname"; static int32_t msg_process(TAOS_RES* msg) { char buf[1024]; @@ -243,7 +244,7 @@ _end: tmq_list_t* build_topic_list() { tmq_list_t* topicList = tmq_list_new(); - int32_t code = tmq_list_append(topicList, "topicname"); + int32_t code = tmq_list_append(topicList, topic_name); if (code) { tmq_list_destroy(topicList); return NULL; @@ -269,6 +270,31 @@ void basic_consume_loop(tmq_t* tmq) { fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); } +void consume_repeatly(tmq_t* tmq) { + int32_t numOfAssignment = 0; + tmq_topic_assignment* pAssign = NULL; + + int32_t code = tmq_get_topic_assignment(tmq, topic_name, &pAssign, &numOfAssignment); + if (code != 0) { + fprintf(stderr, "failed to get assignment, reason:%s", tmq_err2str(code)); + } + + // seek to the earliest offset + for(int32_t i = 0; i < numOfAssignment; ++i) { + tmq_topic_assignment* p = &pAssign[i]; + + code = tmq_offset_seek(tmq, topic_name, p->vgId, p->begin); + if (code != 0) { + fprintf(stderr, "failed to seek to %ld, reason:%s", p->begin, tmq_err2str(code)); + } + } + + free(pAssign); + + // let's do it again + basic_consume_loop(tmq); +} + int main(int argc, char* argv[]) { int32_t code; @@ -294,10 +320,13 @@ int main(int argc, char* argv[]) { if ((code = tmq_subscribe(tmq, topic_list))) { fprintf(stderr, "Failed to tmq_subscribe(): %s\n", tmq_err2str(code)); } + tmq_list_destroy(topic_list); basic_consume_loop(tmq); + consume_repeatly(tmq); + code = tmq_consumer_close(tmq); if (code) { fprintf(stderr, "Failed to close consumer: %s\n", tmq_err2str(code)); diff --git a/include/client/taos.h b/include/client/taos.h index 05f3c5d11de38e742beb6520cd825ca69a9c394b..e46cd99c73c824a123cde51f3f4eea08ba380970 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -263,7 +263,7 @@ DLL_EXPORT const char *tmq_err2str(int32_t code); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ typedef struct tmq_topic_assignment { - int32_t vgroupHandle; + int32_t vgId; int64_t currentOffset; int64_t begin; int64_t end; @@ -277,7 +277,7 @@ DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char* pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); -DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset); +DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char* pTopicName, int32_t vgId, int64_t offset); /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 87c58ab32f4c8ef1dbb36ecfc8ead95bdce639fc..76fd1d84d0bec43ddea33172aecdcbdfa3ab68a3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2357,7 +2357,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { tmq_topic_assignment assignment = {.begin = pHead->walsver, .end = pHead->walever, .currentOffset = rsp.rspOffset.version, - .vgroupHandle = pParam->vgId}; + .vgId = pParam->vgId}; taosThreadMutexLock(&pCommon->mutex); taosArrayPush(pCommon->pList, &assignment); @@ -2422,7 +2422,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; - pAssignment->vgroupHandle = pClientVg->vgId; + pAssignment->vgId = pClientVg->vgId; } if (needFetch) { @@ -2524,7 +2524,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } } -int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle, int64_t offset) { +int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { if (tmq == NULL) { tscError("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; @@ -2544,14 +2544,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgroupHandle int32_t numOfVgs = taosArrayGetSize(pTopic->vgs); for (int32_t i = 0; i < numOfVgs; ++i) { SMqClientVg* pClientVg = taosArrayGet(pTopic->vgs, i); - if (pClientVg->vgId == vgroupHandle) { + if (pClientVg->vgId == vgId) { pVg = pClientVg; break; } } if (pVg == NULL) { - tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgroupHandle); + tscError("consumer:0x%" PRIx64 " invalid vgroup id:%d", tmq->consumerId, vgId); return TSDB_CODE_INVALID_PARA; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 08d62054e3b043f9bdbae3dcbd5f61f63758629d..f15a93cb2c300587bfffe880b8a45bf0aef60175 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1144,7 +1144,7 @@ TEST(clientCase, sub_tb_test) { taos_free_result(pRes); } - tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgroupHandle, pAssign[0].begin); + tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, pAssign[0].begin); } tmq_consumer_close(tmq);