diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 40d72d3af1bee05b14e2b83dc2b67c12a833dadc..2e8aa21da7a2bdd83e4a995beccb99ac40228a48 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -24,6 +24,7 @@ static void msg_process(TAOS_RES* msg) { char buf[1024]; /*memset(buf, 0, 1024);*/ printf("topic: %s\n", tmq_get_topic_name(msg)); + printf("db: %s\n", tmq_get_db_name(msg)); printf("vg: %d\n", tmq_get_vgroup_id(msg)); while (1) { TAOS_ROW row = taos_fetch_row(msg); diff --git a/include/client/taos.h b/include/client/taos.h index bab0c18db17572a05c8a7d433876f48a404ded97..b65091f52bdd218138891970f079158033cb2d69 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -144,8 +144,8 @@ DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *nam DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags); DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields); -DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields); +DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields); +DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int *fieldNum, TAOS_FIELD_E **fields); DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); @@ -269,6 +269,7 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); +DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f332300a4f5d951826a7eb4334c06a4c45621d7c..21fed73f6d620c5c45202135faf01540c89d1fc4 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2203,10 +2203,8 @@ typedef struct { int64_t newConsumerId; char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int8_t subType; - // int8_t withTbName; - // int8_t withSchema; - // int8_t withTag; - char* qmsg; + char* qmsg; + int64_t suid; } SMqRebVgReq; static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pReq) { @@ -2217,11 +2215,10 @@ static FORCE_INLINE int32_t tEncodeSMqRebVgReq(void** buf, const SMqRebVgReq* pR tlen += taosEncodeFixedI64(buf, pReq->newConsumerId); tlen += taosEncodeString(buf, pReq->subKey); tlen += taosEncodeFixedI8(buf, pReq->subType); - // tlen += taosEncodeFixedI8(buf, pReq->withTbName); - // tlen += taosEncodeFixedI8(buf, pReq->withSchema); - // tlen += taosEncodeFixedI8(buf, pReq->withTag); if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { tlen += taosEncodeString(buf, pReq->qmsg); + } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { + tlen += taosEncodeFixedI64(buf, pReq->suid); } return tlen; } @@ -2233,11 +2230,10 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq) buf = taosDecodeFixedI64(buf, &pReq->newConsumerId); buf = taosDecodeStringTo(buf, pReq->subKey); buf = taosDecodeFixedI8(buf, &pReq->subType); - // buf = taosDecodeFixedI8(buf, &pReq->withTbName); - // buf = taosDecodeFixedI8(buf, &pReq->withSchema); - // buf = taosDecodeFixedI8(buf, &pReq->withTag); if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) { buf = taosDecodeString(buf, &pReq->qmsg); + } else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) { + buf = taosDecodeFixedI64(buf, &pReq->suid); } return (void*)buf; } @@ -2471,7 +2467,7 @@ static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) { typedef struct { char topic[TSDB_TOPIC_FNAME_LEN]; - int8_t isSchemaAdaptive; + char db[TSDB_DB_FNAME_LEN]; SArray* vgs; // SArray SSchemaWrapper schema; } SMqSubTopicEp; @@ -2479,7 +2475,7 @@ typedef struct { static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) { int32_t tlen = 0; tlen += taosEncodeString(buf, pTopicEp->topic); - tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive); + tlen += taosEncodeString(buf, pTopicEp->db); int32_t sz = taosArrayGetSize(pTopicEp->vgs); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { @@ -2492,7 +2488,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) { buf = taosDecodeStringTo(buf, pTopicEp->topic); - buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive); + buf = taosDecodeStringTo(buf, pTopicEp->db); int32_t sz; buf = taosDecodeFixedI32(buf, &sz); pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp)); diff --git a/include/util/tdef.h b/include/util/tdef.h index de139368c93c08a0fe3a9913a0a21e6925b95c7b..0ae22d195395f6225dddf33c52a99046ad41354d 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -209,7 +209,7 @@ typedef enum ELogicConditionType { #define TSDB_INDEX_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_INDEX_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TYPE_STR_MAX_LEN 32 #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) -#define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN +#define TSDB_TOPIC_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_STREAM_FNAME_LEN TSDB_TABLE_FNAME_LEN #define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CGROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) #define TSDB_PARTITION_KEY_LEN (TSDB_SUBSCRIBE_KEY_LEN + 20) diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index d5e07ce676b10b0c3962c0d6a0125062847d4ce9..0732c7890d19063ca2f48657926d7bbe245dc59e 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -191,6 +191,7 @@ typedef struct SRequestSendRecvBody { typedef struct { int8_t resType; char topic[TSDB_TOPIC_FNAME_LEN]; + char db[TSDB_DB_FNAME_LEN]; int32_t vgId; SSchemaWrapper schema; int32_t resIter; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 416d1a6f26f7d86506df1d2e0855da8e5bf71e3b..c2170631c2c90ca1d7322a4210f7763c6a703c57 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -143,6 +143,7 @@ typedef struct { typedef struct { // subscribe info char* topicName; + char db[TSDB_DB_FNAME_LEN]; SArray* vgs; // SArray @@ -1039,6 +1040,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { topic.schema = pTopicEp->schema; taosHashClear(pHash); topic.topicName = strdup(pTopicEp->topic); + tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN); tscDebug("consumer %ld update topic: %s", tmq->consumerId, topic.topicName); int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); @@ -1283,7 +1285,8 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); pRspObj->resType = RES_TYPE__TMQ; - strncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); + tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); + tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN); pRspObj->vgId = pWrapper->vgHandle->vgId; pRspObj->resIter = -1; memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp)); @@ -1506,6 +1509,15 @@ const char* tmq_get_topic_name(TAOS_RES* res) { } } +const char* tmq_get_db_name(TAOS_RES* res) { + if (TD_RES_TMQ(res)) { + SMqRspObj* pRspObj = (SMqRspObj*)res; + return strchr(pRspObj->db, '.') + 1; + } else { + return NULL; + } +} + int32_t tmq_get_vgroup_id(TAOS_RES* res) { if (TD_RES_TMQ(res)) { SMqRspObj* pRspObj = (SMqRspObj*)res; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index d0c737ae5a7518cf864f55ab3f5c9702b7f60073..4bf7b15593ab37c864e41a3c042558b461282c19 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -168,7 +168,7 @@ typedef struct { int64_t createdTime; int64_t updateTime; SDnodeObj* pDnode; - SQnodeLoad load; + SQnodeLoad load; } SQnodeObj; typedef struct { @@ -422,6 +422,7 @@ typedef struct { char* ast; char* physicalPlan; SSchemaWrapper schema; + int64_t stbUid; // int32_t refConsumerCnt; } SMqTopicObj; @@ -535,7 +536,7 @@ typedef struct { } SMqRebOutputObj; typedef struct { - char name[TSDB_TOPIC_FNAME_LEN]; + char name[TSDB_STREAM_FNAME_LEN]; char sourceDb[TSDB_DB_FNAME_LEN]; char targetDb[TSDB_DB_FNAME_LEN]; char targetSTbName[TSDB_TABLE_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 0314891d59f38d5f8fdc4f92ecaca3f8c09bf2cd..1f8bf0699322ffdaad5c479b3c8fec3451645527 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -306,6 +306,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); ASSERT(pTopic); taosRLockLatch(&pTopic->lock); + tstrncpy(topicEp.db, pTopic->db, TSDB_DB_FNAME_LEN); topicEp.schema.nCols = pTopic->schema.nCols; if (topicEp.schema.nCols) { topicEp.schema.pSchema = taosMemoryCalloc(topicEp.schema.nCols, sizeof(SSchema)); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index fc736809fd16098388033553f4fec92fb2df6974..3c43998e858c1167a0e861eb479613e465d4e181 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -121,9 +121,6 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri req.vgId = pRebVg->pVgEp->vgId; req.qmsg = pRebVg->pVgEp->qmsg; req.subType = pSub->subType; - /*req.withTbName = pSub->withTbName;*/ - /*req.withSchema = pSub->withSchema;*/ - /*req.withTag = pSub->withTag;*/ strncpy(req.subKey, pSub->key, TSDB_SUBSCRIBE_KEY_LEN); int32_t tlen = sizeof(SMsgHead) + tEncodeSMqRebVgReq(NULL, &req); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 446992a24588e6b0c2b5bbadfe00a899dbc6cdc8..b364f25b9d5f62788bee15a226f0b6ee330a6843 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -375,13 +375,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * taosMemoryFree(topicObj.sql); return -1; } - /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/ - /*topicObj.ast = NULL;*/ - /*topicObj.astLen = 0;*/ - /*topicObj.physicalPlan = NULL;*/ - /*topicObj.withTbName = 1;*/ - /*topicObj.withSchema = 1;*/ + } else if (pCreate->subType == TOPIC_SUB_TYPE__TABLE) { } + /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/ + /*topicObj.ast = NULL;*/ + /*topicObj.astLen = 0;*/ + /*topicObj.physicalPlan = NULL;*/ + /*topicObj.withTbName = 1;*/ + /*topicObj.withSchema = 1;*/ STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq); if (pTrans == NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e79de255f3cc99ea1fb75370ec3b87d35093625e..b8c79608f11053bb20c4032951f01f89d7612a9b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -264,14 +264,13 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { /*pExec->withSchema = req.withSchema;*/ /*pExec->withTag = req.withTag;*/ - pHandle->execHandle.exec.execCol.qmsg = req.qmsg; - req.qmsg = NULL; - pHandle->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); for (int32_t i = 0; i < 5; i++) { pHandle->execHandle.pExecReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); } if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + pHandle->execHandle.exec.execCol.qmsg = req.qmsg; + req.qmsg = NULL; for (int32_t i = 0; i < 5; i++) { SReadHandle handle = { .reader = pHandle->execHandle.pExecReader[i], @@ -286,6 +285,13 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { pHandle->execHandle.exec.execDb.pFilterOutTbUid = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK); } else if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__TABLE) { + int64_t suid = 0; + /*pHandle->execHandle.exec.execTb.suid = req.suid;*/ + SArray* tbUidList = taosArrayInit(0, sizeof(int16_t)); + tsdbGetAllTableList(pTq->pVnode->pMeta, suid, tbUidList); + for (int32_t i = 0; i < 5; i++) { + tqReadHandleSetTbUidList(pHandle->execHandle.pExecReader[i], tbUidList); + } } taosHashPut(pTq->handles, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); } else {