diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 94545dfaad943a3ec5e35ae6859213835d8c4540..e1133c109e72873ddcf4b89a818d878d56a628f1 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -73,7 +73,7 @@ static int32_t init_env() { taos_free_result(pRes); // create database - pRes = taos_query(pConn, "create database tmqdb precision 'ns'"); + pRes = taos_query(pConn, "create database tmqdb precision 'ns' WAL_RETENTION_PERIOD 3600"); if (taos_errno(pRes) != 0) { printf("error in create tmqdb, reason:%s\n", taos_errstr(pRes)); goto END; @@ -289,7 +289,7 @@ void consume_repeatly(tmq_t* tmq) { } } - free(pAssign); + tmq_free_assignment(pAssign); // let's do it again basic_consume_loop(tmq); diff --git a/include/client/taos.h b/include/client/taos.h index 8811c4ab64e3ae65085c26ccd791705a98541423..9f5a84fb2267be832fe8bf61fb9648ea28212b60 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -288,6 +288,7 @@ DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); DLL_EXPORT int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment); +DLL_EXPORT void tmq_free_assignment(tmq_topic_assignment* pAssignment); DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t vgId, int64_t offset); /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c38e206a9bd3b4428727edc3042d3e2a8a398022..aff5846092348f5ec7e77938e7a0f11c764affe4 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2584,6 +2584,14 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } } +void tmq_free_assignment(tmq_topic_assignment* pAssignment) { + if (pAssignment == NULL) { + return; + } + + taosMemoryFree(pAssignment); +} + int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { if (tmq == NULL) { tscError("invalid tmq handle, null");