From fc99fe53de433d85d9cb643bdb1d4c223016a142 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Jan 2022 23:19:54 +0800 Subject: [PATCH] consume skip ununsed table --- include/common/tmsg.h | 53 ++++++++++++++++++---------- source/client/src/clientImpl.c | 2 +- source/client/src/clientMsgHandler.c | 2 +- source/dnode/vnode/inc/vnode.h | 4 +-- source/dnode/vnode/src/tq/tq.c | 2 +- 5 files changed, 39 insertions(+), 24 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb53c6ddfa..e12a46984f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1579,32 +1579,47 @@ typedef struct SMqSetCVgRsp { char cGroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; -typedef struct SMqCVConsumeReq { +typedef struct SMqConsumeReq { int64_t reqId; int64_t offset; int64_t consumerId; int64_t blockingTime; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN]; -} SMqCVConsumeReq; +} SMqConsumeReq; -typedef struct SMqConsumeRspBlock { - int32_t bodyLen; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char body[]; -} SMqConsumeRspBlock; - -typedef struct SMqCVConsumeRsp { - int64_t reqId; - int64_t clientId; - int64_t committedOffset; - int64_t receiveOffset; - int64_t rspOffset; - int32_t skipLogNum; - int32_t bodyLen; - char topicName[TSDB_TOPIC_FNAME_LEN]; - SMqConsumeRspBlock blocks[]; -} SMqCvConsumeRsp; +typedef struct SMqColData { + int16_t colId; + int16_t type; + int16_t bytes; + char data[]; +} SMqColData; + +typedef struct SMqTbData { + int64_t uid; + int32_t numOfCols; + int32_t numOfRows; + SMqColData colData[]; +} SMqTbData; + +typedef struct SMqTopicBlk { + char topicName[TSDB_TOPIC_FNAME_LEN]; + int64_t committedOffset; + int64_t reqOffset; + int64_t rspOffset; + int32_t skipLogNum; + int32_t bodyLen; + int32_t numOfTb; + SMqTbData tbData[]; +} SMqTopicData; + +typedef struct SMqConsumeRsp { + int64_t reqId; + int64_t clientId; + int32_t bodyLen; + int32_t numOfTopics; + SMqTopicData data[]; +} SMqConsumeRsp; #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 159a92b0ab..9cc66d6879 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -117,7 +117,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); - p->pAppHbMgr = appHbMgrInit(p); + /*p->pAppHbMgr = appHbMgrInit(p);*/ taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); pInst = &p; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index ec088eb073..81ea18fe08 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -72,7 +72,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY}; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL); + /*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/ // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ab538ff12d..9e6ecb6e23 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -68,7 +68,7 @@ typedef struct { typedef struct STqReadHandle { int64_t ver; - int64_t tbUid; + uint64_t tbUid; SSubmitMsg* pMsg; SSubmitBlk* pBlock; SSubmitMsgIter msgIter; @@ -204,7 +204,7 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA pReadHandle->pColIdList = pColIdList; } -static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) { +static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t tbUid) { pHandle->tbUid = tbUid; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4a6f55564c..b18f50cd3f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -610,7 +610,7 @@ int tqItemSSize() { } int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { - SMqCVConsumeReq* pReq = pMsg->pCont; + SMqConsumeReq* pReq = pMsg->pCont; SRpcMsg rpcMsg; int64_t reqId = pReq->reqId; int64_t consumerId = pReq->consumerId; -- GitLab