diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8527b29a2dae7fb51128c3587094050b6a2bafab..77e2f61c046851d5d70b451dd3f987551bc57ae2 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -234,6 +234,92 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) return buf; } +typedef struct SMqHbVgInfo { + int32_t vgId; +} SMqHbVgInfo; + +static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pVgInfo->vgId); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) { + buf = taosDecodeFixedI32(buf, &pVgInfo->vgId); + return buf; +} + +typedef struct SMqHbTopicInfo { + int32_t epoch; + int64_t topicUid; + char name[TSDB_TOPIC_FNAME_LEN]; + SArray* pVgInfo; +} SMqHbTopicInfo; + +static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch); + tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid); + tlen += taosEncodeString(buf, pTopicInfo->name); + int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i); + tlen += taosEncodeSMqVgInfo(buf, pVgInfo); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) { + buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch); + buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid); + buf = taosDecodeStringTo(buf, pTopicInfo->name); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo)); + for (int32_t i = 0; i < sz; i++) { + SMqHbVgInfo vgInfo; + buf = taosDecodeSMqVgInfo(buf, &vgInfo); + taosArrayPush(pTopicInfo->pVgInfo, &vgInfo); + } + return buf; +} + +typedef struct SMqHbMsg { + int32_t status; // ask hb endpoint + int32_t epoch; + int64_t consumerId; + SArray* pTopics; // SArray +} SMqHbMsg; + +static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pMsg->status); + tlen += taosEncodeFixedI32(buf, pMsg->epoch); + tlen += taosEncodeFixedI64(buf, pMsg->consumerId); + int32_t sz = taosArrayGetSize(pMsg->pTopics); + tlen += taosEncodeFixedI32(buf, sz); + for (int i = 0; i < sz; i++) { + SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i); + tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { + buf = taosDecodeFixedI32(buf, &pMsg->status); + buf = taosDecodeFixedI32(buf, &pMsg->epoch); + buf = taosDecodeFixedI64(buf, &pMsg->consumerId); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo)); + for (int i = 0; i < sz; i++) { + SMqHbTopicInfo topicInfo; + buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo); + taosArrayPush(pMsg->pTopics, &topicInfo); + } + return buf; +} typedef struct { int32_t vgId; @@ -399,6 +485,66 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { return buf; } +typedef struct SMqHbOneTopicBatchRsp { + char topicName[TSDB_TOPIC_FNAME_LEN]; + SArray* rsps; // SArray +} SMqHbOneTopicBatchRsp; + +static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) { + int tlen = 0; + tlen += taosEncodeString(buf, pBatchRsp->topicName); + int32_t sz = taosArrayGetSize(pBatchRsp->rsps); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i); + tlen += taosEncodeSMqHbRsp(buf, pRsp); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) { + int32_t sz; + buf = taosDecodeStringTo(buf, pBatchRsp->topicName); + buf = taosDecodeFixedI32(buf, &sz); + pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp)); + for (int32_t i = 0; i < sz; i++) { + SMqHbRsp rsp; + buf = taosDecodeSMqHbRsp(buf, &rsp); + buf = taosArrayPush(pBatchRsp->rsps, &rsp); + } + return buf; +} + +typedef struct SMqHbBatchRsp { + int64_t consumerId; + SArray* batchRsps; // SArray +} SMqHbBatchRsp; + +static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { + int tlen = 0; + tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId); + int32_t sz; + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i); + tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) { + buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); + for (int32_t i = 0; i < sz; i++) { + SMqHbOneTopicBatchRsp rsp; + buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); + buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); + } + return buf; +} + typedef struct { int32_t acctId; int64_t clusterId; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 87cb6766e377f195ee5c8aa53e6446e790acb566..0c7626c5af9a1811f0c15598ee242fbaf7a381c1 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -290,8 +290,8 @@ typedef struct tmq_topic_vgroup_list_t { typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param)); typedef struct tmq_conf_t{ - char* clientId; - char* groupId; + char groupId[256]; + char clientId[256]; char* ip; uint16_t port; tmq_commit_cb* commit_cb; @@ -321,24 +321,27 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { taosArrayDestroy(pArray); return NULL; } - strcpy(kv.key, "groupId"); - kv.keyLen = strlen("groupId") + 1; - kv.value = malloc(256); - if (kv.value == NULL) { - free(kv.key); - taosArrayDestroy(pArray); - return NULL; + strcpy(kv.key, "mq-tmp"); + kv.keyLen = strlen("mq-tmp") + 1; + SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg)); + if (pMqHb == NULL) { + return pArray; + } + pMqHb->consumerId = connKey.connId; + SArray* clientTopics = pTmq->clientTopics; + int sz = taosArrayGetSize(clientTopics); + for (int i = 0; i < sz; i++) { + SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i); + if (pCTopic->vgId == -1) { + pMqHb->status = 1; + break; + } } - strcpy(kv.value, pTmq->groupId); - kv.valueLen = strlen(pTmq->groupId) + 1; - + kv.value = pMqHb; + kv.valueLen = sizeof(SMqHbMsg); taosArrayPush(pArray, &kv); - strcpy(kv.key, "clientUid"); - kv.keyLen = strlen("clientUid") + 1; - *(uint32_t*)kv.value = pTmq->pTscObj->connId; - kv.valueLen = sizeof(uint32_t); - - return NULL; + + return pArray; } tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { @@ -354,12 +357,12 @@ tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { return pTmq; } -TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { +TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { STscObj* pTscObj = (STscObj*)taos; SRequestObj* pRequest = NULL; - SQueryNode* pQuery = NULL; + SQueryNode* pQueryNode = NULL; SQueryDag* pDag = NULL; - char *dagStr = NULL; + char *pStr = NULL; terrno = TSDB_CODE_SUCCESS; if (taos == NULL || topicName == NULL || sql == NULL) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 4d1a07be217fb942317f095cebec5091835feb11..48e9dce3c14eafb4b4c81d17d445767236224785 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -92,7 +92,7 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { int32_t kvNum = taosHashGetSize(pReq->info); tlen += taosEncodeFixedI32(buf, kvNum); SKv kv; - void* pIter = taosHashIterate(pReq->info, pIter); + void* pIter = taosHashIterate(pReq->info, NULL); while (pIter != NULL) { taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); kv.valueLen = taosHashGetDataLen(pIter); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 0c2524f48c375b3432d1f7931fefe0e87b302ef0..de101b0f069f93b9ed05b6b9c403b85b5a45ecd5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -346,19 +346,23 @@ typedef struct SMqTopicObj { char *logicalPlan; char *physicalPlan; SHashObj *cgroups; // SHashObj + SHashObj *consumers; // SHashObj } SMqTopicObj; // TODO: add cache and change name to id typedef struct SMqConsumerTopic { - char name[TSDB_TOPIC_NAME_LEN]; - SList *vgroups; // SList + int32_t epoch; + char name[TSDB_TOPIC_NAME_LEN]; + //TODO: replace with something with ep + SList *vgroups; // SList } SMqConsumerTopic; typedef struct SMqConsumerObj { - SRWLatch lock; int64_t consumerId; + SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray *topics; // SArray + SHashObj *topicHash; } SMqConsumerObj; typedef struct SMqSubConsumerObj { diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 79e5d9eae595eb95be701c3f0da8518cbd55f660..a40302af46f39e0a896123ffc2ab028c64170e91 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -15,10 +15,13 @@ #define _DEFAULT_SOURCE #include "mndProfile.h" +#include "mndConsumer.h" #include "mndDb.h" #include "mndMnode.h" #include "mndShow.h" +#include "mndTopic.h" #include "mndUser.h" +#include "mndVgroup.h" #define QUERY_ID_SIZE 20 #define QUERY_OBJ_ID_SIZE 18 @@ -257,6 +260,68 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { return TSDB_CODE_SUCCESS; } +static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) { + SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp)); + if (pRsp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pRsp->connKey = pReq->connKey; + SMqHbBatchRsp batchRsp; + batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp)); + if (batchRsp.batchRsps == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + SClientHbKey connKey = pReq->connKey; + SHashObj* pObj = pReq->info; + SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1); + if (pKv == NULL) { + free(pRsp); + return NULL; + } + SMqHbMsg mqHb; + taosDecodeSMqMsg(pKv->value, &mqHb); + /*int64_t clientUid = htonl(pKv->value);*/ + /*if (mqHb.epoch )*/ + int sz = taosArrayGetSize(mqHb.pTopics); + SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId); + for (int i = 0; i < sz; i++) { + SMqHbOneTopicBatchRsp innerBatchRsp; + innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp)); + if (innerBatchRsp.rsps == NULL) { + //TODO + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i); + SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1); + if (pConsumerTopic->epoch != topicInfo->epoch) { + //add new vgids into rsp + int vgSz = taosArrayGetSize(topicInfo->pVgInfo); + for (int j = 0; j < vgSz; j++) { + SMqHbRsp innerRsp; + SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i); + SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId); + innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj); + taosArrayPush(innerBatchRsp.rsps, &innerRsp); + } + } + taosArrayPush(batchRsp.batchRsps, &innerBatchRsp); + } + int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp); + void* buf = malloc(tlen); + if (buf == NULL) { + //TODO + return NULL; + } + void* abuf = buf; + taosEncodeSMqHbBatchRsp(&abuf, &batchRsp); + pRsp->body = buf; + pRsp->bodyLen = tlen; + return pRsp; +} + static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { SMnode *pMnode = pReq->pMnode; char *batchReqStr = pReq->rpcMsg.pCont; @@ -273,19 +338,17 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { - SClientHbRsp rsp = { - .status = 0, - .connKey = pHbReq->connKey, - .bodyLen = 0, - .body = NULL - }; - taosArrayPush(batchRsp.rsps, &rsp); + SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq); + if (pRsp != NULL) { + taosArrayPush(batchRsp.rsps, pRsp); + free(pRsp); + } } } int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); void* buf = rpcMallocCont(tlen); - void* bufCopy = buf; - tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp); + void* abuf = buf; + tSerializeSClientHbBatchRsp(&abuf, &batchRsp); pReq->contLen = tlen; pReq->pCont = buf; return 0; diff --git a/source/dnode/mnode/impl/test/profile/profile.cpp b/source/dnode/mnode/impl/test/profile/profile.cpp index 4b329886eba63cfd4a1864a4796c73db604fbef0..4ad979cdd33a8e4d128737eb8697f47314328c41 100644 --- a/source/dnode/mnode/impl/test/profile/profile.cpp +++ b/source/dnode/mnode/impl/test/profile/profile.cpp @@ -121,11 +121,11 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { SClientHbBatchRsp rsp = {0}; tDeserializeSClientHbBatchRsp(pRspChar, &rsp); int sz = taosArrayGetSize(rsp.rsps); - ASSERT_EQ(sz, 1); - SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0); - EXPECT_EQ(pRsp->connKey.connId, 123); - EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ); - EXPECT_EQ(pRsp->status, 0); + ASSERT_EQ(sz, 0); + //SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0); + //EXPECT_EQ(pRsp->connKey.connId, 123); + //EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ); + //EXPECT_EQ(pRsp->status, 0); #if 0 int32_t contLen = sizeof(SHeartBeatReq);