提交 354da672 编写于 作者: L Liu Jicong

fix hb crash

上级 cc64e9e4
...@@ -234,6 +234,92 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) ...@@ -234,6 +234,92 @@ static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey)
return buf; return buf;
} }
typedef struct SMqHbVgInfo {
int32_t vgId;
} SMqHbVgInfo;
static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) {
buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
return buf;
}
typedef struct SMqHbTopicInfo {
int32_t epoch;
int64_t topicUid;
char name[TSDB_TOPIC_FNAME_LEN];
SArray* pVgInfo;
} SMqHbTopicInfo;
static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
tlen += taosEncodeString(buf, pTopicInfo->name);
int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) {
buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
buf = taosDecodeStringTo(buf, pTopicInfo->name);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo));
for (int32_t i = 0; i < sz; i++) {
SMqHbVgInfo vgInfo;
buf = taosDecodeSMqVgInfo(buf, &vgInfo);
taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
}
return buf;
}
typedef struct SMqHbMsg {
int32_t status; // ask hb endpoint
int32_t epoch;
int64_t consumerId;
SArray* pTopics; // SArray<SMqHbTopicInfo>
} SMqHbMsg;
static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
int tlen = 0;
tlen += taosEncodeFixedI32(buf, pMsg->status);
tlen += taosEncodeFixedI32(buf, pMsg->epoch);
tlen += taosEncodeFixedI64(buf, pMsg->consumerId);
int32_t sz = taosArrayGetSize(pMsg->pTopics);
tlen += taosEncodeFixedI32(buf, sz);
for (int i = 0; i < sz; i++) {
SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i);
tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
buf = taosDecodeFixedI32(buf, &pMsg->status);
buf = taosDecodeFixedI32(buf, &pMsg->epoch);
buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo));
for (int i = 0; i < sz; i++) {
SMqHbTopicInfo topicInfo;
buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo);
taosArrayPush(pMsg->pTopics, &topicInfo);
}
return buf;
}
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
...@@ -399,6 +485,66 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { ...@@ -399,6 +485,66 @@ static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
return buf; return buf;
} }
typedef struct SMqHbOneTopicBatchRsp {
char topicName[TSDB_TOPIC_FNAME_LEN];
SArray* rsps; // SArray<SMqHbRsp>
} SMqHbOneTopicBatchRsp;
static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) {
int tlen = 0;
tlen += taosEncodeString(buf, pBatchRsp->topicName);
int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i);
tlen += taosEncodeSMqHbRsp(buf, pRsp);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) {
int32_t sz;
buf = taosDecodeStringTo(buf, pBatchRsp->topicName);
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
for (int32_t i = 0; i < sz; i++) {
SMqHbRsp rsp;
buf = taosDecodeSMqHbRsp(buf, &rsp);
buf = taosArrayPush(pBatchRsp->rsps, &rsp);
}
return buf;
}
typedef struct SMqHbBatchRsp {
int64_t consumerId;
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
} SMqHbBatchRsp;
static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
int tlen = 0;
tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId);
int32_t sz;
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i);
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
}
return tlen;
}
static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) {
buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId);
int32_t sz;
buf = taosDecodeFixedI32(buf, &sz);
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
for (int32_t i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp rsp;
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
}
return buf;
}
typedef struct { typedef struct {
int32_t acctId; int32_t acctId;
int64_t clusterId; int64_t clusterId;
......
...@@ -290,8 +290,8 @@ typedef struct tmq_topic_vgroup_list_t { ...@@ -290,8 +290,8 @@ typedef struct tmq_topic_vgroup_list_t {
typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param)); typedef void (tmq_commit_cb(tmq_t*, tmq_resp_err_t, tmq_topic_vgroup_list_t*, void* param));
typedef struct tmq_conf_t{ typedef struct tmq_conf_t{
char* clientId; char groupId[256];
char* groupId; char clientId[256];
char* ip; char* ip;
uint16_t port; uint16_t port;
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
...@@ -321,24 +321,27 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) { ...@@ -321,24 +321,27 @@ SArray* tmqGetConnInfo(SClientHbKey connKey, void* param) {
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
return NULL; return NULL;
} }
strcpy(kv.key, "groupId"); strcpy(kv.key, "mq-tmp");
kv.keyLen = strlen("groupId") + 1; kv.keyLen = strlen("mq-tmp") + 1;
kv.value = malloc(256); SMqHbMsg* pMqHb = malloc(sizeof(SMqHbMsg));
if (kv.value == NULL) { if (pMqHb == NULL) {
free(kv.key); return pArray;
taosArrayDestroy(pArray); }
return NULL; pMqHb->consumerId = connKey.connId;
SArray* clientTopics = pTmq->clientTopics;
int sz = taosArrayGetSize(clientTopics);
for (int i = 0; i < sz; i++) {
SMqClientTopic* pCTopic = taosArrayGet(clientTopics, i);
if (pCTopic->vgId == -1) {
pMqHb->status = 1;
break;
}
} }
strcpy(kv.value, pTmq->groupId); kv.value = pMqHb;
kv.valueLen = strlen(pTmq->groupId) + 1; kv.valueLen = sizeof(SMqHbMsg);
taosArrayPush(pArray, &kv); taosArrayPush(pArray, &kv);
strcpy(kv.key, "clientUid");
kv.keyLen = strlen("clientUid") + 1; return pArray;
*(uint32_t*)kv.value = pTmq->pTscObj->connId;
kv.valueLen = sizeof(uint32_t);
return NULL;
} }
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
...@@ -354,12 +357,12 @@ tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) { ...@@ -354,12 +357,12 @@ tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
return pTmq; return pTmq;
} }
TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) {
STscObj* pTscObj = (STscObj*)taos; STscObj* pTscObj = (STscObj*)taos;
SRequestObj* pRequest = NULL; SRequestObj* pRequest = NULL;
SQueryNode* pQuery = NULL; SQueryNode* pQueryNode = NULL;
SQueryDag* pDag = NULL; SQueryDag* pDag = NULL;
char *dagStr = NULL; char *pStr = NULL;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
if (taos == NULL || topicName == NULL || sql == NULL) { if (taos == NULL || topicName == NULL || sql == NULL) {
......
...@@ -92,7 +92,7 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) { ...@@ -92,7 +92,7 @@ int tSerializeSClientHbReq(void **buf, const SClientHbReq *pReq) {
int32_t kvNum = taosHashGetSize(pReq->info); int32_t kvNum = taosHashGetSize(pReq->info);
tlen += taosEncodeFixedI32(buf, kvNum); tlen += taosEncodeFixedI32(buf, kvNum);
SKv kv; SKv kv;
void* pIter = taosHashIterate(pReq->info, pIter); void* pIter = taosHashIterate(pReq->info, NULL);
while (pIter != NULL) { while (pIter != NULL) {
taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen); taosHashGetKey(pIter, &kv.key, (size_t *)&kv.keyLen);
kv.valueLen = taosHashGetDataLen(pIter); kv.valueLen = taosHashGetDataLen(pIter);
......
...@@ -346,19 +346,23 @@ typedef struct SMqTopicObj { ...@@ -346,19 +346,23 @@ typedef struct SMqTopicObj {
char *logicalPlan; char *logicalPlan;
char *physicalPlan; char *physicalPlan;
SHashObj *cgroups; // SHashObj<SMqCGroup> SHashObj *cgroups; // SHashObj<SMqCGroup>
SHashObj *consumers; // SHashObj<SMqConsumerObj>
} SMqTopicObj; } SMqTopicObj;
// TODO: add cache and change name to id // TODO: add cache and change name to id
typedef struct SMqConsumerTopic { typedef struct SMqConsumerTopic {
char name[TSDB_TOPIC_NAME_LEN]; int32_t epoch;
SList *vgroups; // SList<int32_t> char name[TSDB_TOPIC_NAME_LEN];
//TODO: replace with something with ep
SList *vgroups; // SList<int32_t>
} SMqConsumerTopic; } SMqConsumerTopic;
typedef struct SMqConsumerObj { typedef struct SMqConsumerObj {
SRWLatch lock;
int64_t consumerId; int64_t consumerId;
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray *topics; // SArray<SMqConsumerTopic> SArray *topics; // SArray<SMqConsumerTopic>
SHashObj *topicHash;
} SMqConsumerObj; } SMqConsumerObj;
typedef struct SMqSubConsumerObj { typedef struct SMqSubConsumerObj {
......
...@@ -15,10 +15,13 @@ ...@@ -15,10 +15,13 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndProfile.h" #include "mndProfile.h"
#include "mndConsumer.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndTopic.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h"
#define QUERY_ID_SIZE 20 #define QUERY_ID_SIZE 20
#define QUERY_OBJ_ID_SIZE 18 #define QUERY_OBJ_ID_SIZE 18
...@@ -257,6 +260,68 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { ...@@ -257,6 +260,68 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) {
SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp));
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pRsp->connKey = pReq->connKey;
SMqHbBatchRsp batchRsp;
batchRsp.batchRsps = taosArrayInit(0, sizeof(SMqHbRsp));
if (batchRsp.batchRsps == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SClientHbKey connKey = pReq->connKey;
SHashObj* pObj = pReq->info;
SKv* pKv = taosHashGet(pObj, "mq-tmp", strlen("mq-tmp") + 1);
if (pKv == NULL) {
free(pRsp);
return NULL;
}
SMqHbMsg mqHb;
taosDecodeSMqMsg(pKv->value, &mqHb);
/*int64_t clientUid = htonl(pKv->value);*/
/*if (mqHb.epoch )*/
int sz = taosArrayGetSize(mqHb.pTopics);
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, mqHb.consumerId);
for (int i = 0; i < sz; i++) {
SMqHbOneTopicBatchRsp innerBatchRsp;
innerBatchRsp.rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
if (innerBatchRsp.rsps == NULL) {
//TODO
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SMqHbTopicInfo* topicInfo = taosArrayGet(mqHb.pTopics, i);
SMqConsumerTopic* pConsumerTopic = taosHashGet(pConsumer->topicHash, topicInfo->name, strlen(topicInfo->name)+1);
if (pConsumerTopic->epoch != topicInfo->epoch) {
//add new vgids into rsp
int vgSz = taosArrayGetSize(topicInfo->pVgInfo);
for (int j = 0; j < vgSz; j++) {
SMqHbRsp innerRsp;
SMqHbVgInfo* pVgInfo = taosArrayGet(topicInfo->pVgInfo, i);
SVgObj* pVgObj = mndAcquireVgroup(pMnode, pVgInfo->vgId);
innerRsp.epSet = mndGetVgroupEpset(pMnode, pVgObj);
taosArrayPush(innerBatchRsp.rsps, &innerRsp);
}
}
taosArrayPush(batchRsp.batchRsps, &innerBatchRsp);
}
int32_t tlen = taosEncodeSMqHbBatchRsp(NULL, &batchRsp);
void* buf = malloc(tlen);
if (buf == NULL) {
//TODO
return NULL;
}
void* abuf = buf;
taosEncodeSMqHbBatchRsp(&abuf, &batchRsp);
pRsp->body = buf;
pRsp->bodyLen = tlen;
return pRsp;
}
static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
char *batchReqStr = pReq->rpcMsg.pCont; char *batchReqStr = pReq->rpcMsg.pCont;
...@@ -273,19 +338,17 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { ...@@ -273,19 +338,17 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) { if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_QUERY) {
} else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) { } else if (pHbReq->connKey.hbType == HEARTBEAT_TYPE_MQ) {
SClientHbRsp rsp = { SClientHbRsp *pRsp = mndMqHbBuildRsp(pMnode, pHbReq);
.status = 0, if (pRsp != NULL) {
.connKey = pHbReq->connKey, taosArrayPush(batchRsp.rsps, pRsp);
.bodyLen = 0, free(pRsp);
.body = NULL }
};
taosArrayPush(batchRsp.rsps, &rsp);
} }
} }
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
void* buf = rpcMallocCont(tlen); void* buf = rpcMallocCont(tlen);
void* bufCopy = buf; void* abuf = buf;
tSerializeSClientHbBatchRsp(&bufCopy, &batchRsp); tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
pReq->contLen = tlen; pReq->contLen = tlen;
pReq->pCont = buf; pReq->pCont = buf;
return 0; return 0;
......
...@@ -121,11 +121,11 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) { ...@@ -121,11 +121,11 @@ TEST_F(MndTestProfile, 04_HeartBeatMsg) {
SClientHbBatchRsp rsp = {0}; SClientHbBatchRsp rsp = {0};
tDeserializeSClientHbBatchRsp(pRspChar, &rsp); tDeserializeSClientHbBatchRsp(pRspChar, &rsp);
int sz = taosArrayGetSize(rsp.rsps); int sz = taosArrayGetSize(rsp.rsps);
ASSERT_EQ(sz, 1); ASSERT_EQ(sz, 0);
SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0); //SClientHbRsp* pRsp = (SClientHbRsp*) taosArrayGet(rsp.rsps, 0);
EXPECT_EQ(pRsp->connKey.connId, 123); //EXPECT_EQ(pRsp->connKey.connId, 123);
EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ); //EXPECT_EQ(pRsp->connKey.hbType, HEARTBEAT_TYPE_MQ);
EXPECT_EQ(pRsp->status, 0); //EXPECT_EQ(pRsp->status, 0);
#if 0 #if 0
int32_t contLen = sizeof(SHeartBeatReq); int32_t contLen = sizeof(SHeartBeatReq);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册