提交 963fb9b9 编写于 作者: L Liu Jicong

fix mq rebalance

上级 8a180a33
......@@ -628,6 +628,10 @@ struct tmq_message_t {
};
int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
if (code == -1) {
printf("discard\n");
return 0;
}
SMqClientVg* pVg = (SMqClientVg*)param;
SMqConsumeRsp rsp;
tDecodeSMqConsumeRsp(pMsg->pData, &rsp);
......@@ -680,6 +684,8 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = pParam->tmq;
if (code != 0) {
printf("exit wait %d\n", pParam->wait);
if (pParam->wait) {
tsem_post(&tmq->rspSem);
}
......@@ -691,9 +697,9 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp);
int32_t sz = taosArrayGetSize(rsp.topics);
// TODO: lock
printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);
printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);
if (rsp.epoch != tmq->epoch) {
/*printf("rsp epoch %ld", rsp.epoch);*/
/*printf("tmq epoch %ld", tmq->epoch);*/
//TODO
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
......
......@@ -104,9 +104,6 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
if (found == 0) {
taosArrayPush(pSub->availConsumer, &consumerId);
}
SSdbRaw* pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWriteNotFree(pMnode->pSdb, pRaw);
int32_t assignedSz = taosArrayGetSize(pSub->assigned);
topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp));
......@@ -126,7 +123,9 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
taosArrayPush(rsp.topics, &topicEp);
}
if (changed || found) {
SSdbRaw* pRaw = mndSubActionEncode(pSub);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWriteNotFree(pMnode->pSdb, pRaw);
}
mndReleaseSubscribe(pMnode, pSub);
}
......@@ -183,12 +182,15 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
}
// TODO: acquire consumer, set status to unavail
}
#if 0
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++;
printf("current epoch %ld size %ld", pConsumer->epoch, pConsumer->topics->size);
SSdbRaw* pRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWriteNotFree(pMnode->pSdb, pRaw);
mndReleaseConsumer(pMnode, pConsumer);
#endif
}
}
if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0 && taosArrayGetSize(pSub->availConsumer) > 0) {
......@@ -207,6 +209,13 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
taosArrayPush(pSub->assigned, pCEp);
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++;
/*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
/*sdbWriteNotFree(pMnode->pSdb, pConsumerRaw);*/
mndReleaseConsumer(pMnode, pConsumer);
// build msg
SMqSetCVgReq req = {0};
......@@ -216,8 +225,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = pTopic->physicalPlan;
req.qmsg = pCEp->qmsg;
req.newConsumerId = consumerId;
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(tlen);
void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -262,6 +272,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
SArray *pArray;
SArray *inner = taosArrayGet(pDag->pSubplans, 0);
SSubplan *plan = taosArrayGetP(inner, 0);
plan->execNode.inUse = 0;
strcpy(plan->execNode.epAddr[0].fqdn, "localhost");
plan->execNode.epAddr[0].port = 6030;
......
......@@ -71,6 +71,7 @@ typedef struct {
typedef struct STqReadHandle {
int64_t ver;
uint64_t tbUid;
SHashObj* tbIdHash;
SSubmitMsg* pMsg;
SSubmitBlk* pBlock;
SSubmitMsgIter msgIter;
......@@ -211,6 +212,19 @@ static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t t
pHandle->tbUid = tbUid;
}
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle* pHandle, SArray* tbUidList) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) {
return -1;
}
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
//pHandle->tbUid = tbUid;
}
return 0;
}
void tqReadHandleSetMsg(STqReadHandle* pHandle, SSubmitMsg* pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
......
......@@ -679,7 +679,13 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
SMqConsumeRsp rsp = {.consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL};
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
ASSERT(pConsumer);
if (pConsumer == NULL) {
pMsg->pCont = NULL;
pMsg->contLen = 0;
pMsg->code = -1;
rpcSendResponse(pMsg);
return 0;
}
int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0; i < sz; i++) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册