From 573a86ed99669f04d1a87d52e3a573a4eefd896a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 May 2023 09:51:00 +0800 Subject: [PATCH] feature(tmq): add new API to extract offset from result set. --- include/client/taos.h | 1 + source/client/src/clientTmq.c | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/include/client/taos.h b/include/client/taos.h index d9fd1ca1b8..8811c4ab64 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -310,6 +310,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); +DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); /* ------------------------------ TAOSX -----------------------------------*/ // note: following apis are unstable diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 87aee4a8a3..63e8b3097c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -2109,6 +2109,29 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { } } +int64_t tmq_get_vgroup_offset(TAOS_RES* res) { + if (TD_RES_TMQ(res)) { + SMqRspObj* pRspObj = (SMqRspObj*) res; + STqOffsetVal* pOffset = &pRspObj->rsp.rspOffset; + if (pOffset->type == TMQ_OFFSET__LOG) { + return pRspObj->rsp.rspOffset.version; + } + } else if (TD_RES_TMQ_META(res)) { + SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res; + if (pRspObj->metaRsp.rspOffset.type == TMQ_OFFSET__LOG) { + return pRspObj->metaRsp.rspOffset.version; + } + } else if (TD_RES_TMQ_METADATA(res)) { + SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*) res; + if (pRspObj->rsp.rspOffset.type == TMQ_OFFSET__LOG) { + return pRspObj->rsp.rspOffset.version; + } + } + + // data from tsdb, no valid offset info + return -1; +} + const char* tmq_get_table_name(TAOS_RES* res) { if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; -- GitLab