提交 fb59ec45 编写于 作者: wmmhello's avatar wmmhello

feat:move plan msg from vginfo to subscription info

上级 46b1a219
...@@ -559,12 +559,12 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer ...@@ -559,12 +559,12 @@ void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
char* qmsg; // SubPlanToString // char* qmsg; // SubPlanToString
SEpSet epSet; SEpSet epSet;
} SMqVgEp; } SMqVgEp;
SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp); //SMqVgEp* tCloneSMqVgEp(const SMqVgEp* pVgEp);
void tDeleteSMqVgEp(SMqVgEp* pVgEp); //void tDeleteSMqVgEp(SMqVgEp* pVgEp);
int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp); int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp);
void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp); void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp);
...@@ -589,6 +589,7 @@ typedef struct { ...@@ -589,6 +589,7 @@ typedef struct {
SHashObj* consumerHash; // consumerId -> SMqConsumerEp SHashObj* consumerHash; // consumerId -> SMqConsumerEp
SArray* unassignedVgs; // SArray<SMqVgEp*> SArray* unassignedVgs; // SArray<SMqVgEp*>
char dbName[TSDB_DB_FNAME_LEN]; char dbName[TSDB_DB_FNAME_LEN];
char* qmsg; // SubPlanToString
} SMqSubscribeObj; } SMqSubscribeObj;
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]); SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
...@@ -687,12 +688,12 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); ...@@ -687,12 +688,12 @@ int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver);
void tFreeStreamObj(SStreamObj* pObj); void tFreeStreamObj(SStreamObj* pObj);
typedef struct { //typedef struct {
char streamName[TSDB_STREAM_FNAME_LEN]; // char streamName[TSDB_STREAM_FNAME_LEN];
int64_t uid; // int64_t uid;
int64_t streamUid; // int64_t streamUid;
SArray* childInfo; // SArray<SStreamChildEpInfo> // SArray* childInfo; // SArray<SStreamChildEpInfo>
} SStreamCheckpointObj; //} SStreamCheckpointObj;
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -644,7 +644,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { ...@@ -644,7 +644,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
SCMSubscribeReq subscribe = {0}; SCMSubscribeReq subscribe = {0};
tDeserializeSCMSubscribeReq(msgStr, &subscribe); tDeserializeSCMSubscribeReq(msgStr, &subscribe);
uint64_t consumerId = subscribe.consumerId; int64_t consumerId = subscribe.consumerId;
char *cgroup = subscribe.cgroup; char *cgroup = subscribe.cgroup;
SMqConsumerObj *pExistedConsumer = NULL; SMqConsumerObj *pExistedConsumer = NULL;
SMqConsumerObj *pConsumerNew = NULL; SMqConsumerObj *pConsumerNew = NULL;
......
...@@ -183,33 +183,33 @@ void tFreeStreamObj(SStreamObj *pStream) { ...@@ -183,33 +183,33 @@ void tFreeStreamObj(SStreamObj *pStream) {
} }
} }
SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) { //SMqVgEp *tCloneSMqVgEp(const SMqVgEp *pVgEp) {
SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp)); // SMqVgEp *pVgEpNew = taosMemoryMalloc(sizeof(SMqVgEp));
if (pVgEpNew == NULL) return NULL; // if (pVgEpNew == NULL) return NULL;
pVgEpNew->vgId = pVgEp->vgId; // pVgEpNew->vgId = pVgEp->vgId;
pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg); //// pVgEpNew->qmsg = taosStrdup(pVgEp->qmsg);
pVgEpNew->epSet = pVgEp->epSet; // pVgEpNew->epSet = pVgEp->epSet;
return pVgEpNew; // return pVgEpNew;
} //}
void tDeleteSMqVgEp(SMqVgEp *pVgEp) { //void tDeleteSMqVgEp(SMqVgEp *pVgEp) {
if (pVgEp) { // if (pVgEp) {
taosMemoryFreeClear(pVgEp->qmsg); //// taosMemoryFreeClear(pVgEp->qmsg);
taosMemoryFree(pVgEp); // taosMemoryFree(pVgEp);
} // }
} //}
int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) { int32_t tEncodeSMqVgEp(void **buf, const SMqVgEp *pVgEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pVgEp->vgId); tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeString(buf, pVgEp->qmsg); // tlen += taosEncodeString(buf, pVgEp->qmsg);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen; return tlen;
} }
void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) { void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
buf = taosDecodeFixedI32(buf, &pVgEp->vgId); buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
buf = taosDecodeString(buf, &pVgEp->qmsg); // buf = taosDecodeString(buf, &pVgEp->qmsg);
buf = taosDecodeSEpSet(buf, &pVgEp->epSet); buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
return (void *)buf; return (void *)buf;
} }
...@@ -382,13 +382,13 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) { ...@@ -382,13 +382,13 @@ SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp)); SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
if (pConsumerEpNew == NULL) return NULL; if (pConsumerEpNew == NULL) return NULL;
pConsumerEpNew->consumerId = pConsumerEpOld->consumerId; pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp); pConsumerEpNew->vgs = taosArrayDup(pConsumerEpOld->vgs, NULL);
return pConsumerEpNew; return pConsumerEpNew;
} }
void tDeleteSMqConsumerEp(void *data) { void tDeleteSMqConsumerEp(void *data) {
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)data;
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); taosArrayDestroy(pConsumerEp->vgs);
} }
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) { int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
...@@ -463,12 +463,13 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) { ...@@ -463,12 +463,13 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
pConsumerEp = (SMqConsumerEp *)pIter; pConsumerEp = (SMqConsumerEp *)pIter;
SMqConsumerEp newEp = { SMqConsumerEp newEp = {
.consumerId = pConsumerEp->consumerId, .consumerId = pConsumerEp->consumerId,
.vgs = taosArrayDup(pConsumerEp->vgs, (__array_item_dup_fn_t)tCloneSMqVgEp), .vgs = taosArrayDup(pConsumerEp->vgs, NULL),
}; };
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp)); taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
} }
pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, (__array_item_dup_fn_t)tCloneSMqVgEp); pSubNew->unassignedVgs = taosArrayDup(pSub->unassignedVgs, NULL);
memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN); memcpy(pSubNew->dbName, pSub->dbName, TSDB_DB_FNAME_LEN);
pSubNew->qmsg = taosStrdup(pSub->qmsg);
return pSubNew; return pSubNew;
} }
...@@ -478,10 +479,11 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) { ...@@ -478,10 +479,11 @@ void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
pIter = taosHashIterate(pSub->consumerHash, pIter); pIter = taosHashIterate(pSub->consumerHash, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp); taosArrayDestroy(pConsumerEp->vgs);
} }
taosHashCleanup(pSub->consumerHash); taosHashCleanup(pSub->consumerHash);
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp); taosArrayDestroy(pSub->unassignedVgs);
taosMemoryFreeClear(pSub->qmsg);
} }
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) { int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
......
...@@ -570,23 +570,19 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib ...@@ -570,23 +570,19 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId); mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
sdbRelease(pSdb, pVgroup);
}
if (pSubplan) { if (pSubplan) {
int32_t msgLen; int32_t msgLen;
pSubplan->execNode.epSet = pVgEp->epSet; if (qSubPlanToString(pSubplan, &pSub->qmsg, &msgLen) < 0) {
pSubplan->execNode.nodeId = pVgEp->vgId;
if (qSubPlanToString(pSubplan, &pVgEp->qmsg, &msgLen) < 0) {
sdbRelease(pSdb, pVgroup);
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
} else { } else {
pVgEp->qmsg = taosStrdup(""); pSub->qmsg = taosStrdup("");
}
sdbRelease(pSdb, pVgroup);
} }
qDestroyQueryPlan(pPlan); qDestroyQueryPlan(pPlan);
......
...@@ -99,13 +99,23 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj ...@@ -99,13 +99,23 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
return pSub; return pSub;
} }
static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscribeObj *pSub, static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg) { const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
SMqRebVgReq req = {0}; SMqRebVgReq req = {0};
req.oldConsumerId = pRebVg->oldConsumerId; req.oldConsumerId = pRebVg->oldConsumerId;
req.newConsumerId = pRebVg->newConsumerId; req.newConsumerId = pRebVg->newConsumerId;
req.vgId = pRebVg->pVgEp->vgId; req.vgId = pRebVg->pVgEp->vgId;
req.qmsg = pRebVg->pVgEp->qmsg; if(pPlan){
pPlan->execNode.epSet = pRebVg->pVgEp->epSet;
pPlan->execNode.nodeId = pRebVg->pVgEp->vgId;
int32_t msgLen;
if (qSubPlanToString(pPlan, &req.qmsg, &msgLen) < 0) {
terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1;
}
}else{
req.qmsg = taosStrdup("");
}
req.subType = pSub->subType; req.subType = pSub->subType;
req.withMeta = pSub->withMeta; req.withMeta = pSub->withMeta;
req.suid = pSub->stbUid; req.suid = pSub->stbUid;
...@@ -115,6 +125,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri ...@@ -115,6 +125,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
int32_t ret = 0; int32_t ret = 0;
tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret); tEncodeSize(tEncodeSMqRebVgReq, &req, tlen, ret);
if (ret < 0) { if (ret < 0) {
taosMemoryFree(req.qmsg);
return -1; return -1;
} }
...@@ -122,6 +133,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri ...@@ -122,6 +133,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
void *buf = taosMemoryMalloc(tlen); void *buf = taosMemoryMalloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(req.qmsg);
return -1; return -1;
} }
...@@ -135,17 +147,19 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri ...@@ -135,17 +147,19 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
if (tEncodeSMqRebVgReq(&encoder, &req) < 0) { if (tEncodeSMqRebVgReq(&encoder, &req) < 0) {
taosMemoryFreeClear(buf); taosMemoryFreeClear(buf);
tEncoderClear(&encoder); tEncoderClear(&encoder);
taosMemoryFree(req.qmsg);
return -1; return -1;
} }
tEncoderClear(&encoder); tEncoderClear(&encoder);
*pBuf = buf; *pBuf = buf;
*pLen = tlen; *pLen = tlen;
taosMemoryFree(req.qmsg);
return 0; return 0;
} }
static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, SMqSubscribeObj *pSub,
const SMqRebOutputVg *pRebVg) { const SMqRebOutputVg *pRebVg, SSubplan* pPlan) {
// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { // if (pRebVg->oldConsumerId == pRebVg->newConsumerId) {
// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; // terrno = TSDB_CODE_MND_INVALID_SUB_OPTION;
// return -1; // return -1;
...@@ -153,7 +167,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM ...@@ -153,7 +167,7 @@ static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SM
void *buf; void *buf;
int32_t tlen; int32_t tlen;
if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg) < 0) { if (mndBuildSubChangeReq(&buf, &tlen, pSub, pRebVg, pPlan) < 0) {
return -1; return -1;
} }
...@@ -483,14 +497,25 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR ...@@ -483,14 +497,25 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
} }
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) { static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
struct SSubplan* pPlan = NULL;
if(strcmp(pOutput->pSub->qmsg, "") != 0){
int32_t code = qStringToSubplan(pOutput->pSub->qmsg, &pPlan);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return -1;
}
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb"); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg, "tmq-reb");
if (pTrans == NULL) { if (pTrans == NULL) {
nodesDestroyNode((SNode*)pPlan);
return -1; return -1;
} }
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL); mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) { if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
nodesDestroyNode((SNode*)pPlan);
return -1; return -1;
} }
...@@ -500,11 +525,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu ...@@ -500,11 +525,13 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
int32_t vgNum = taosArrayGetSize(rebVgs); int32_t vgNum = taosArrayGetSize(rebVgs);
for (int32_t i = 0; i < vgNum; i++) { for (int32_t i = 0; i < vgNum; i++) {
SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i); SMqRebOutputVg *pRebVg = taosArrayGet(rebVgs, i);
if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg) < 0) { if (mndPersistSubChangeVgReq(pMnode, pTrans, pOutput->pSub, pRebVg, pPlan) < 0) {
mndTransDrop(pTrans); mndTransDrop(pTrans);
nodesDestroyNode((SNode*)pPlan);
return -1; return -1;
} }
} }
nodesDestroyNode((SNode*)pPlan);
// 2. redo log: subscribe and vg assignment // 2. redo log: subscribe and vg assignment
// subscribe // subscribe
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册