From 402a5e3b6114903d02104dfb2d4b0a065f87a27e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 7 Mar 2022 16:30:28 +0800 Subject: [PATCH] add scheduler --- include/dnode/snode/snode.h | 5 +- source/dnode/mnode/impl/inc/mndDef.h | 3 + source/dnode/mnode/impl/inc/mndScheduler.h | 34 ++++++++++ source/dnode/mnode/impl/src/mndDnode.c | 13 +++- source/dnode/mnode/impl/src/mndScheduler.c | 73 ++++++++++++++++++++++ source/dnode/mnode/impl/src/mndSubscribe.c | 10 +++ source/dnode/snode/src/snode.c | 3 +- source/dnode/vnode/src/tq/tq.c | 5 +- source/dnode/vnode/src/tq/tqRead.c | 1 + 9 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 source/dnode/mnode/impl/inc/mndScheduler.h create mode 100644 source/dnode/mnode/impl/src/mndScheduler.c diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 319708decc..c9fab140cc 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -16,6 +16,9 @@ #ifndef _TD_SNODE_H_ #define _TD_SNODE_H_ +#include "tmsg.h" +#include "trpc.h" + #ifdef __cplusplus extern "C" { #endif @@ -88,4 +91,4 @@ void sndDestroy(const char *path); } #endif -#endif /*_TD_SNODE_H_*/ \ No newline at end of file +#endif /*_TD_SNODE_H_*/ diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 91d37c4eed..05f7de60ff 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -661,6 +661,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons return buf; } +typedef struct { +} SStreamScheduler; + typedef struct SMnodeMsg { char user[TSDB_USER_LEN]; char db[TSDB_DB_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h new file mode 100644 index 0000000000..3bf6e0c33a --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -0,0 +1,34 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_SCHEDULER_H_ +#define _TD_MND_SCHEDULER_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitScheduler(SMnode* pMnode); +void mndCleanupScheduler(SMnode* pMnode); + +int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_SCHEDULER_H_ */ diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index b872e933d5..e231717e61 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -340,9 +340,16 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { pVgroup->compStorage = pVload->compStorage; pVgroup->pointsWritten = pVload->pointsWritten; } + bool roleChanged = false; for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { + if (pVgroup->vnodeGid[vg].role != pVload->role) { + roleChanged = true; + } pVgroup->vnodeGid[vg].role = pVload->role; } + if (roleChanged) { + // notify scheduler role has changed + } } mndReleaseVgroup(pMnode, pVgroup); @@ -631,13 +638,13 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp pShow->bytes[cols] = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "name"); + strcpy(pSchema[cols].name, "name"); pSchema[cols].bytes = pShow->bytes[cols]; cols++; pShow->bytes[cols] = TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE; pSchema[cols].type = TSDB_DATA_TYPE_BINARY; - strcpy(pSchema[cols].name, "value"); + strcpy(pSchema[cols].name, "value"); pSchema[cols].bytes = pShow->bytes[cols]; cols++; @@ -823,4 +830,4 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, i static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c new file mode 100644 index 0000000000..9b301ad98a --- /dev/null +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "mndScheduler.h" +#include "mndConsumer.h" +#include "mndDb.h" +#include "mndDnode.h" +#include "mndMnode.h" +#include "mndOffset.h" +#include "mndShow.h" +#include "mndStb.h" +#include "mndSubscribe.h" +#include "mndTopic.h" +#include "mndTrans.h" +#include "mndUser.h" +#include "mndVgroup.h" +#include "tcompare.h" +#include "tname.h" + +int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) { + SSdb* pSdb = pMnode->pSdb; + SVgObj* pVgroup = NULL; + SQueryDag* pDag = qStringToDag(pTopic->physicalPlan); + SArray* pAray = NULL; + SArray* unassignedVg = pSub->unassignedVg; + + ASSERT(pSub->vgNum == 0); + + int32_t levelNum = taosArrayGetSize(pDag->pSubplans); + if (levelNum != 1) { + return -1; + } + + SArray* inner = taosArrayGet(pDag->pSubplans, 0); + SSubplan* plan = taosArrayGetP(inner, 0); + + void* pIter = NULL; + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); + if (pIter == NULL) break; + if (pVgroup->dbUid != pTopic->dbUid) { + sdbRelease(pSdb, pVgroup); + continue; + } + + pSub->vgNum++; + plan->execNode.nodeId = pVgroup->vgId; + plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup); + + SMqConsumerEp consumerEp = {0}; + consumerEp.status = 0; + consumerEp.consumerId = -1; + consumerEp.epSet = plan->execNode.epset; + consumerEp.vgId = plan->execNode.nodeId; + int32_t msgLen; + int32_t code = qSubPlanToString(plan, &consumerEp.qmsg, &msgLen); + taosArrayPush(unassignedVg, &consumerEp); + } + + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 60533b979c..84360ab23b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -20,6 +20,7 @@ #include "mndDnode.h" #include "mndMnode.h" #include "mndOffset.h" +#include "mndScheduler.h" #include "mndShow.h" #include "mndStb.h" #include "mndTopic.h" @@ -97,12 +98,21 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj strcpy(pSub->key, key); free(key); + if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { + terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC; + tDeleteSMqSubscribeObj(pSub); + free(pSub); + return NULL; + } + +#if 0 if (mndInitUnassignedVg(pMnode, pTopic, pSub) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; tDeleteSMqSubscribeObj(pSub); free(pSub); return NULL; } +#endif // TODO: disable alter subscribed table return pSub; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 5d21a037e7..01500fbc54 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -17,6 +17,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *pSnode = calloc(1, sizeof(SSnode)); + memcpy(&pSnode->cfg, pOption, sizeof(SSnodeOpt)); return pSnode; } @@ -29,4 +30,4 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { return 0; } -void sndDestroy(const char *path) {} \ No newline at end of file +void sndDestroy(const char *path) {} diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ccad006657..d540b589b7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -376,7 +376,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) { pTopic->buffer.output[i].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); - SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta}; + SReadHandle handle = { + .reader = pReadHandle, + .meta = pTq->pVnodeMeta, + }; pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index bfa811feec..2f24df0309 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -28,6 +28,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { pReadHandle->sver = -1; pReadHandle->pSchema = NULL; pReadHandle->pSchemaWrapper = NULL; + pReadHandle->tbIdHash = NULL; return pReadHandle; } -- GitLab