提交 6cf90b0a 编写于 作者: L Liu Jicong

add timer for mq rebalance

上级 61fd0565
......@@ -1089,6 +1089,10 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq
return buf;
}
typedef struct SMqTmrMsg {
int32_t reserved;
} SMqTmrMsg;
typedef struct {
int64_t status;
} SMVSubscribeRsp;
......
......@@ -140,6 +140,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg)
// Requests handled by VNODE
TD_NEW_MSG_SEG(TDMT_VND_MSG)
......
......@@ -148,30 +148,30 @@ TEST(testCase, connect_Test) {
// taos_close(pConn);
//}
//
//TEST(testCase, create_db_Test) {
//TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//assert(pConn != NULL);
TEST(testCase, create_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
//TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2");
//if (taos_errno(pRes) != 0) {
//printf("error in create db, reason:%s\n", taos_errstr(pRes));
//}
TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
}
//TAOS_FIELD* pFields = taos_fetch_fields(pRes);
//ASSERT_TRUE(pFields == NULL);
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == NULL);
//int32_t numOfFields = taos_num_fields(pRes);
//ASSERT_EQ(numOfFields, 0);
int32_t numOfFields = taos_num_fields(pRes);
ASSERT_EQ(numOfFields, 0);
//taos_free_result(pRes);
taos_free_result(pRes);
pRes = taos_query(pConn, "create database abc1 vgroups 4");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
}
taos_close(pConn);
}
//pRes = taos_query(pConn, "create database abc1 vgroups 4");
//if (taos_errno(pRes) != 0) {
//printf("error in create db, reason:%s\n", taos_errstr(pRes));
//}
//taos_close(pConn);
//}
//
//TEST(testCase, create_dnode_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
......
......@@ -100,7 +100,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg;
......
......@@ -354,6 +354,7 @@ typedef struct SMqSubscribeObj {
char key[TSDB_SUBSCRIBE_KEY_LEN];
int32_t epoch;
//TODO: replace with priority queue
int32_t nextConsumerIdx;
SArray* availConsumer; // SArray<int64_t> (consumerId)
SArray* assigned; // SArray<SMqConsumerEp>
SArray* unassignedConsumer; // SArray<SMqConsumerEp>
......
......@@ -80,6 +80,7 @@ typedef struct SMnode {
SReplica replicas[TSDB_MAX_REPLICA];
tmr_h timer;
tmr_h transTimer;
tmr_h mqTimer;
char *path;
SMnodeCfg cfg;
int64_t checkTime;
......
......@@ -13,13 +13,13 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndSubscribe.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
......@@ -40,6 +40,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic);
int32_t mndInitSubscribe(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_SUBSCRIBE,
......@@ -54,9 +58,90 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
/*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/
/*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
return sdbSetTable(pMnode->pSdb, table);
}
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
int i = 0;
while (key[i] != ':') {
i++;
}
key[i] = 0;
*topic = strdup(key);
key[i] = ':';
*cgroup = strdup(&key[i + 1]);
return 0;
}
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
SSdb *pSdb = pMnode->pSdb;
SMqSubscribeObj *pSub = NULL;
void *pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub);
int sz;
while (pIter != NULL) {
if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) {
char *topic = NULL;
char *cgroup = NULL;
mndSplitSubscribeKey(pSub->key, &topic, &cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
// create trans
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
for (int i = 0; i < sz; i++) {
int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx);
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
pCEp->consumerId = consumerId;
taosArrayPush(pSub->assigned, pCEp);
pSub->nextConsumerIdx++;
// build msg
SMqSetCVgReq req = {
.vgId = pCEp->vgId,
.consumerId = consumerId,
};
strcpy(req.cGroup, cgroup);
strcpy(req.topicName, topic);
strcpy(req.sql, pTopic->sql);
strcpy(req.logicalPlan, pTopic->logicalPlan);
strcpy(req.physicalPlan, pTopic->physicalPlan);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *reqStr = malloc(tlen);
if (reqStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
void *abuf = reqStr;
tEncodeSMqSetCVgReq(abuf, &req);
// persist msg
STransAction action = {0};
action.epSet = pCEp->epset;
action.pCont = reqStr;
action.contLen = tlen;
action.msgType = TDMT_VND_MQ_SET_CONN;
mndTransAppendRedoAction(pTrans, &action);
// persist raw
SSdbRaw *pRaw = mndSubActionEncode(pSub);
mndTransAppendRedolog(pTrans, pRaw);
tfree(topic);
tfree(cgroup);
}
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
}
mndReleaseTopic(pMnode, pTopic);
mndTransDrop(pTrans);
}
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub);
}
return 0;
}
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
SMqConsumerEp CEp;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
......@@ -76,7 +161,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
}
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
SMqConsumerTopic *pConsumerTopic) {
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic) {
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
for (int32_t i = 0; i < sz; i++) {
int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i);
......@@ -86,7 +171,10 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
.consumerId = pConsumer->consumerId,
};
strcpy(req.cGroup, pConsumer->cgroup);
strcpy(req.topicName, pConsumerTopic->name);
strcpy(req.topicName, pTopic->name);
strcpy(req.sql, pTopic->sql);
strcpy(req.logicalPlan, pTopic->logicalPlan);
strcpy(req.physicalPlan, pTopic->physicalPlan);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *reqStr = malloc(tlen);
if (reqStr == NULL) {
......@@ -94,7 +182,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
return -1;
}
void *abuf = reqStr;
tEncodeSMqSetCVgReq(abuf, &req);
tEncodeSMqSetCVgReq(&abuf, &req);
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
......@@ -278,20 +366,20 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
int i = 0, j = 0;
while (i < newTopicNum || j < oldTopicNum) {
char* newTopicName = NULL;
char* oldTopicName = NULL;
char *newTopicName = NULL;
char *oldTopicName = NULL;
if (i >= newTopicNum) {
// encode unset topic msg to all vnodes related to that topic
oldTopicName = ((SMqConsumerTopic*)taosArrayGet(oldSub, j))->name;
oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name;
j++;
} else if (j >= oldTopicNum) {
newTopicName = taosArrayGet(newSub, i);
i++;
} else {
newTopicName = taosArrayGet(newSub, i);
oldTopicName = ((SMqConsumerTopic*)taosArrayGet(oldSub, j))->name;
oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name;
int comp = compareLenPrefixedStr(newTopicName, oldTopicName);
int comp = compareLenPrefixedStr(newTopicName, oldTopicName);
if (comp == 0) {
// do nothing
oldTopicName = newTopicName = NULL;
......@@ -374,19 +462,25 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
// set unassigned vg
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
}
taosArrayPush(pSub->availConsumer, &consumerId);
//TODO: no need
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
taosArrayPush(pConsumer->topics, pConsumerTopic);
if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) {
int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo);
// send setmsg to vnode
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic) < 0) {
if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) {
// TODO
return -1;
}
}
taosArrayDestroy(pConsumerTopic->pVgInfo);
free(pConsumerTopic);
SSdbRaw *pRaw = mndSubActionEncode(pSub);
/*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/
mndTransAppendRedolog(pTrans, pRaw);
#if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
if (pGroup == NULL) {
......
......@@ -18,6 +18,7 @@
#include "mndAuth.h"
#include "mndBnode.h"
#include "mndCluster.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndFunc.h"
......@@ -27,6 +28,7 @@
#include "mndShow.h"
#include "mndSnode.h"
#include "mndStb.h"
#include "mndSubscribe.h"
#include "mndSync.h"
#include "mndTelem.h"
#include "mndTopic.h"
......@@ -69,15 +71,15 @@ static void mndTransReExecute(void *param, void *tmrId) {
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
}
static void mndCalMqRebalance(void* param, void* tmrId) {
SMnode* pMnode = param;
static void mndCalMqRebalance(void *param, void *tmrId) {
SMnode *pMnode = param;
if (mndIsMaster(pMnode)) {
// iterate cgroup, cal rebalance
// sync with raft
// write sdb
SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg));
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)};
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
}
taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer);
}
static int32_t mndInitTimer(SMnode *pMnode) {
......@@ -95,6 +97,11 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return -1;
}
if (taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
......@@ -102,6 +109,8 @@ static void mndCleanupTimer(SMnode *pMnode) {
if (pMnode->timer != NULL) {
taosTmrStop(pMnode->transTimer);
pMnode->transTimer = NULL;
taosTmrStop(pMnode->mqTimer);
pMnode->mqTimer = NULL;
taosTmrCleanUp(pMnode->timer);
pMnode->timer = NULL;
}
......@@ -171,6 +180,8 @@ static int32_t mndInitSteps(SMnode *pMnode) {
if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
......@@ -377,7 +388,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) {
return NULL;
}
if (pRpcMsg->msgType != TDMT_MND_TRANS) {
if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER) {
SRpcConnInfo connInfo = {0};
if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) {
taosFreeQitem(pMsg);
......
......@@ -108,6 +108,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: handle error
}
break;
case TDMT_VND_MQ_SET_CONN: {
char* reqStr = ptr;
SMqSetCVgReq req;
/*tDecodeSMqSetCVgReq(reqStr, &req);*/
// create topic if not exist
// convert to task
// write mq meta
}
break;
default:
ASSERT(0);
break;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册