提交 7ab2e76c 编写于 作者: L Liu Jicong

feat(tmq): support get tb name

上级 4a791097
...@@ -32,6 +32,11 @@ static void msg_process(TAOS_RES* msg) { ...@@ -32,6 +32,11 @@ static void msg_process(TAOS_RES* msg) {
int32_t numOfFields = taos_field_count(msg); int32_t numOfFields = taos_field_count(msg);
taos_print_row(buf, row, fields, numOfFields); taos_print_row(buf, row, fields, numOfFields);
printf("%s\n", buf); printf("%s\n", buf);
const char* tbName = tmq_get_table_name(msg);
if (tbName) {
printf("from tb: %s\n", tbName);
}
} }
} }
...@@ -101,8 +106,8 @@ int32_t create_topic() { ...@@ -101,8 +106,8 @@ int32_t create_topic() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create topic topic_ctb_column as abc1"); /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column with table as select ts, c1, c2, c3 from st1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -257,10 +257,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co ...@@ -257,10 +257,7 @@ DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_co
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
// TODO DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
#if 0
DLL_EXPORT char *tmq_get_table_name(TAOS_RES *res);
#endif
#if 0 #if 0
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
......
...@@ -2480,6 +2480,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp ...@@ -2480,6 +2480,10 @@ static FORCE_INLINE int32_t tEncodeSMqDataBlkRsp(void** buf, const SMqDataBlkRsp
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRsp->blockSchema, i);
tlen += taosEncodeSSchemaWrapper(buf, pSW); tlen += taosEncodeSSchemaWrapper(buf, pSW);
} }
if (pRsp->withTbName) {
char* tbName = (char*)taosArrayGetP(pRsp->blockTbName, i);
tlen += taosEncodeString(buf, tbName);
}
} }
} }
return tlen; return tlen;
...@@ -2492,6 +2496,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p ...@@ -2492,6 +2496,7 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeFixedI32(buf, &pRsp->blockNum); buf = taosDecodeFixedI32(buf, &pRsp->blockNum);
pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockData = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockDataLen = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockTbName = taosArrayInit(pRsp->blockNum, sizeof(void*));
pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*)); pRsp->blockSchema = taosArrayInit(pRsp->blockNum, sizeof(void*));
if (pRsp->blockNum != 0) { if (pRsp->blockNum != 0) {
buf = taosDecodeFixedI8(buf, &pRsp->withTbName); buf = taosDecodeFixedI8(buf, &pRsp->withTbName);
...@@ -2510,6 +2515,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p ...@@ -2510,6 +2515,11 @@ static FORCE_INLINE void* tDecodeSMqDataBlkRsp(const void* buf, SMqDataBlkRsp* p
buf = taosDecodeSSchemaWrapper(buf, pSW); buf = taosDecodeSSchemaWrapper(buf, pSW);
taosArrayPush(pRsp->blockSchema, &pSW); taosArrayPush(pRsp->blockSchema, &pSW);
} }
if (pRsp->withTbName) {
char* name = NULL;
buf = taosDecodeString(buf, &name);
taosArrayPush(pRsp->blockTbName, &name);
}
} }
} }
return (void*)buf; return (void*)buf;
......
...@@ -1346,3 +1346,16 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { ...@@ -1346,3 +1346,16 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
return -1; return -1;
} }
} }
const char* tmq_get_table_name(TAOS_RES* res) {
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
if (!pRspObj->rsp.withTbName || pRspObj->rsp.blockTbName == NULL || pRspObj->resIter < 0 ||
pRspObj->resIter >= pRspObj->rsp.blockNum) {
return NULL;
}
const char* name = taosArrayGetP(pRspObj->rsp.blockTbName, pRspObj->resIter);
return name;
}
return NULL;
}
...@@ -297,8 +297,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq ...@@ -297,8 +297,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj.ast = strdup(pCreate->ast); topicObj.ast = strdup(pCreate->ast);
topicObj.astLen = strlen(pCreate->ast) + 1; topicObj.astLen = strlen(pCreate->ast) + 1;
topicObj.subType = TOPIC_SUB_TYPE__TABLE; topicObj.subType = TOPIC_SUB_TYPE__TABLE;
topicObj.withTbName = 0; topicObj.withTbName = pCreate->withTbName;
topicObj.withSchema = 0; topicObj.withSchema = pCreate->withSchema;
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(pCreate->ast, &pAst) != 0) { if (nodesStringToNode(pCreate->ast, &pAst) != 0) {
......
...@@ -111,10 +111,10 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur); ...@@ -111,10 +111,10 @@ int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
// SMetaDB // SMetaDB
int metaOpenDB(SMeta* pMeta); int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta);
int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); // int metaSaveTableToDB(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid); int metaRemoveTableFromDb(SMeta* pMeta, tb_uid_t uid);
int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg); int metaSaveSmaToDB(SMeta* pMeta, STSma* pTbCfg);
int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
#endif #endif
#endif #endif
...@@ -123,4 +123,4 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); ...@@ -123,4 +123,4 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
} }
#endif #endif
#endif /*_TD_VNODE_META_H_*/ #endif /*_TD_VNODE_META_H_*/
\ No newline at end of file
...@@ -427,9 +427,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -427,9 +427,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
SMqDataBlkRsp rsp = {0}; SMqDataBlkRsp rsp = {0};
rsp.reqOffset = pReq->currentOffset; rsp.reqOffset = pReq->currentOffset;
rsp.withSchema = pExec->withSchema; rsp.withSchema = pExec->withSchema;
rsp.withTbName = pExec->withTbName;
rsp.blockData = taosArrayInit(0, sizeof(void*)); rsp.blockData = taosArrayInit(0, sizeof(void*));
rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t));
rsp.blockSchema = taosArrayInit(0, sizeof(void*)); rsp.blockSchema = taosArrayInit(0, sizeof(void*));
rsp.blockTbName = taosArrayInit(0, sizeof(void*));
while (1) { while (1) {
consumerEpoch = atomic_load_32(&pExec->epoch); consumerEpoch = atomic_load_32(&pExec->epoch);
...@@ -535,6 +538,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -535,6 +538,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayPush(rsp.blockSchema, &pSW); taosArrayPush(rsp.blockSchema, &pSW);
} }
if (pExec->withTbName) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
int64_t uid = pExec->pExecReader[workerId]->msgIter.uid;
if (metaGetTableEntryByUid(&mr, uid) < 0) {
ASSERT(0);
}
char* tbName = strdup(mr.me.name);
taosArrayPush(rsp.blockTbName, &tbName);
metaReaderClear(&mr);
}
rsp.blockNum++; rsp.blockNum++;
} }
// db subscribe // db subscribe
...@@ -563,6 +578,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -563,6 +578,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
ASSERT(actualLen <= dataStrLen); ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen); taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf); taosArrayPush(rsp.blockData, &buf);
if (pExec->withTbName) {
SMetaReader mr = {0};
metaReaderInit(&mr, pTq->pVnode->pMeta, 0);
if (metaGetTableEntryByUid(&mr, block.info.uid) < 0) {
ASSERT(0);
}
char* tbName = strdup(mr.me.name);
taosArrayPush(rsp.blockTbName, &tbName);
metaReaderClear(&mr);
}
SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper); SSchemaWrapper* pSW = tCloneSSchemaWrapper(pExec->pExecReader[workerId]->pSchemaWrapper);
taosArrayPush(rsp.blockSchema, &pSW); taosArrayPush(rsp.blockSchema, &pSW);
...@@ -614,6 +639,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -614,6 +639,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockData);
taosArrayDestroy(rsp.blockDataLen); taosArrayDestroy(rsp.blockDataLen);
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
taosArrayDestroyP(rsp.blockTbName, (FDelete)taosMemoryFree);
return 0; return 0;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册