From 06853043bd05ddccbd32b5fec71553aa73023fdf Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 12 May 2022 14:57:58 +0800 Subject: [PATCH] feat(tmq): add config msg.with.table.name --- example/src/tmq.c | 3 ++- include/common/tmsg.h | 1 + source/client/src/tmq.c | 39 ++++++++++++++++++++++++---------- source/dnode/vnode/src/tq/tq.c | 11 +++++++--- source/libs/wal/src/walRead.c | 8 ++----- 5 files changed, 41 insertions(+), 21 deletions(-) diff --git a/example/src/tmq.c b/example/src/tmq.c index 2ee91c254c..0b5f3be1b0 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -107,7 +107,7 @@ int32_t create_topic() { taos_free_result(pRes); /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column with table as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -166,6 +166,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); /*tmq_conf_set(conf, "td.connect.db", "abc1");*/ + tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d34892a278..7bb1d70421 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2371,6 +2371,7 @@ typedef struct { typedef struct { SMsgHead head; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; + int8_t withTbName; int32_t epoch; uint64_t reqId; int64_t consumerId; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index b42f072e54..a6b8b842f9 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -57,16 +57,17 @@ struct tmq_topic_vgroup_list_t { }; struct tmq_conf_t { - char clientId[256]; - char groupId[TSDB_CGROUP_LEN]; - int8_t autoCommit; - int8_t resetOffset; - uint16_t port; - int32_t autoCommitInterval; - char* ip; - char* user; - char* pass; - char* db; + char clientId[256]; + char groupId[TSDB_CGROUP_LEN]; + int8_t autoCommit; + int8_t resetOffset; + int8_t withTbName; + uint16_t port; + int32_t autoCommitInterval; + char* ip; + char* user; + char* pass; + /*char* db;*/ tmq_commit_cb* commitCb; void* commitCbUserParam; }; @@ -75,6 +76,7 @@ struct tmq_t { // conf char groupId[TSDB_CGROUP_LEN]; char clientId[256]; + int8_t withTbName; int8_t autoCommit; int32_t autoCommitInterval; int32_t resetOffsetCfg; @@ -187,6 +189,7 @@ typedef struct { tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); + conf->withTbName = -1; conf->autoCommit = true; conf->autoCommitInterval = 5000; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; @@ -240,6 +243,18 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } + if (strcmp(key, "msg.with.table.name") == 0) { + if (strcmp(value, "true") == 0) { + conf->withTbName = 1; + } else if (strcmp(value, "false") == 0) { + conf->withTbName = 0; + } else if (strcmp(value, "none") == 0) { + conf->withTbName = -1; + } else { + return TMQ_CONF_INVALID; + } + } + if (strcmp(key, "td.connect.ip") == 0) { conf->ip = strdup(value); return TMQ_CONF_OK; @@ -257,7 +272,7 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value return TMQ_CONF_OK; } if (strcmp(key, "td.connect.db") == 0) { - conf->db = strdup(value); + /*conf->db = strdup(value);*/ return TMQ_CONF_OK; } @@ -485,6 +500,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); + pTmq->withTbName = conf->withTbName; pTmq->autoCommit = conf->autoCommit; pTmq->autoCommitInterval = conf->autoCommitInterval; pTmq->commitCb = conf->commitCb; @@ -1104,6 +1120,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t waitTime, SMqClientTopic* pReq->subKey[tlen] = TMQ_SEPARATOR; strcpy(pReq->subKey + tlen + 1, pTopic->topicName); + pReq->withTbName = tmq->withTbName; pReq->waitTime = waitTime; pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 29fabc0f9f..6f0c25fb91 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -427,13 +427,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqDataBlkRsp rsp = {0}; rsp.reqOffset = pReq->currentOffset; rsp.withSchema = pExec->withSchema; - rsp.withTbName = pExec->withTbName; rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockSchema = taosArrayInit(0, sizeof(void*)); rsp.blockTbName = taosArrayInit(0, sizeof(void*)); + int8_t withTbName = pExec->withTbName; + if (pReq->withTbName != -1) { + withTbName = pReq->withTbName; + } + rsp.withTbName = withTbName; + while (1) { consumerEpoch = atomic_load_32(&pExec->epoch); if (consumerEpoch > reqEpoch) { @@ -538,7 +543,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { taosArrayPush(rsp.blockSchema, &pSW); } - if (pExec->withTbName) { + if (withTbName) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); int64_t uid = pExec->pExecReader[workerId]->msgIter.uid; @@ -578,7 +583,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ASSERT(actualLen <= dataStrLen); taosArrayPush(rsp.blockDataLen, &actualLen); taosArrayPush(rsp.blockData, &buf); - if (pExec->withTbName) { + if (withTbName) { SMetaReader mr = {0}; metaReaderInit(&mr, pTq->pVnode->pMeta, 0); if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) { diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 7dfe1b8989..64e6881cd0 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -155,9 +155,7 @@ int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) { if (code < 0) return -1; } - if (!taosValidFile(pRead->pReadLogTFile)) { - return -1; - } + ASSERT(taosValidFile(pRead->pReadLogTFile) == true); code = taosReadFile(pRead->pReadLogTFile, pHead, sizeof(SWalHead)); if (code != sizeof(SWalHead)) { @@ -256,9 +254,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) { } } - /*if (!taosValidFile(pRead->pReadLogTFile)) {*/ - /*return -1;*/ - /*}*/ + ASSERT(taosValidFile(pRead->pReadLogTFile) == true); code = taosReadFile(pRead->pReadLogTFile, pRead->pHead, sizeof(SWalHead)); if (code != sizeof(SWalHead)) { -- GitLab