提交 17639758 编写于 作者: L Liu Jicong

fix(tmq): apis adapt to tmq table meta

上级 3f6ce2da
......@@ -187,8 +187,8 @@ DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info();
DLL_EXPORT const char *taos_errstr(TAOS_RES *tres);
DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT const char *taos_errstr(TAOS_RES *res);
DLL_EXPORT int taos_errno(TAOS_RES *res);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
......
......@@ -404,8 +404,17 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
int32_t code = -1;
if (msg != NULL) {
SMqRspObj* pRspObj = (SMqRspObj*)msg;
if (!TD_RES_TMQ(pRspObj)) {
char* topic;
int32_t vgId;
if (TD_RES_TMQ(msg)) {
SMqRspObj* pRspObj = (SMqRspObj*)msg;
topic = pRspObj->topic;
vgId = pRspObj->vgId;
} else if (TD_RES_TMQ_META(msg)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
topic = pMetaRspObj->topic;
vgId = pMetaRspObj->vgId;
} else {
return TSDB_CODE_TMQ_INVALID_MSG;
}
......@@ -424,10 +433,10 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (strcmp(pTopic->topicName, pRspObj->topic) == 0) {
if (strcmp(pTopic->topicName, topic) == 0) {
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->vgId == pRspObj->vgId) {
if (pVg->vgId == vgId) {
if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) {
tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册