diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bb53c6ddfaf815b297e9362b54f3223982a10df5..e12a46984f7723b86e1f2091b4400dc01c1010fb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1579,32 +1579,47 @@ typedef struct SMqSetCVgRsp { char cGroup[TSDB_CONSUMER_GROUP_LEN]; } SMqSetCVgRsp; -typedef struct SMqCVConsumeReq { +typedef struct SMqConsumeReq { int64_t reqId; int64_t offset; int64_t consumerId; int64_t blockingTime; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN]; -} SMqCVConsumeReq; +} SMqConsumeReq; -typedef struct SMqConsumeRspBlock { - int32_t bodyLen; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char body[]; -} SMqConsumeRspBlock; - -typedef struct SMqCVConsumeRsp { - int64_t reqId; - int64_t clientId; - int64_t committedOffset; - int64_t receiveOffset; - int64_t rspOffset; - int32_t skipLogNum; - int32_t bodyLen; - char topicName[TSDB_TOPIC_FNAME_LEN]; - SMqConsumeRspBlock blocks[]; -} SMqCvConsumeRsp; +typedef struct SMqColData { + int16_t colId; + int16_t type; + int16_t bytes; + char data[]; +} SMqColData; + +typedef struct SMqTbData { + int64_t uid; + int32_t numOfCols; + int32_t numOfRows; + SMqColData colData[]; +} SMqTbData; + +typedef struct SMqTopicBlk { + char topicName[TSDB_TOPIC_FNAME_LEN]; + int64_t committedOffset; + int64_t reqOffset; + int64_t rspOffset; + int32_t skipLogNum; + int32_t bodyLen; + int32_t numOfTb; + SMqTbData tbData[]; +} SMqTopicData; + +typedef struct SMqConsumeRsp { + int64_t reqId; + int64_t clientId; + int32_t bodyLen; + int32_t numOfTopics; + SMqTopicData data[]; +} SMqConsumeRsp; #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 159a92b0ab4c3ca1b26c05121a448ac2a30a1d94..9cc66d68794a086034c9af3be9050b471d13eeeb 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -117,7 +117,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); - p->pAppHbMgr = appHbMgrInit(p); + /*p->pAppHbMgr = appHbMgrInit(p);*/ taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES); pInst = &p; diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index ec088eb0735824f3348287ec54fd476f00dc7adb..81ea18fe08afa1ab70af3f06b994f167a9423dc4 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -72,7 +72,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) { atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1); SClientHbKey connKey = {.connId = pConnect->connId, .hbType = HEARTBEAT_TYPE_QUERY}; - hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL); + /*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connKey, NULL);*/ // pRequest->body.resInfo.pRspMsg = pMsg->pData; tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId, diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index ab538ff12d18bbfb64a9208b42230392993eabf5..9e6ecb6e23c5a7819651db1036cfdfdf323df636 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -68,7 +68,7 @@ typedef struct { typedef struct STqReadHandle { int64_t ver; - int64_t tbUid; + uint64_t tbUid; SSubmitMsg* pMsg; SSubmitBlk* pBlock; SSubmitMsgIter msgIter; @@ -204,7 +204,7 @@ static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SA pReadHandle->pColIdList = pColIdList; } -static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) { +static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t tbUid) { pHandle->tbUid = tbUid; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4a6f55564c95e54cf230235b2f32787de54558ad..b18f50cd3f24606212e002ebca43bd35b20eb9e3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -610,7 +610,7 @@ int tqItemSSize() { } int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { - SMqCVConsumeReq* pReq = pMsg->pCont; + SMqConsumeReq* pReq = pMsg->pCont; SRpcMsg rpcMsg; int64_t reqId = pReq->reqId; int64_t consumerId = pReq->consumerId;