提交 402a5e3b 编写于 作者: L Liu Jicong

add scheduler

上级 7510a0e7
...@@ -16,6 +16,9 @@ ...@@ -16,6 +16,9 @@
#ifndef _TD_SNODE_H_ #ifndef _TD_SNODE_H_
#define _TD_SNODE_H_ #define _TD_SNODE_H_
#include "tmsg.h"
#include "trpc.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
...@@ -88,4 +91,4 @@ void sndDestroy(const char *path); ...@@ -88,4 +91,4 @@ void sndDestroy(const char *path);
} }
#endif #endif
#endif /*_TD_SNODE_H_*/ #endif /*_TD_SNODE_H_*/
\ No newline at end of file
...@@ -661,6 +661,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons ...@@ -661,6 +661,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
return buf; return buf;
} }
typedef struct {
} SStreamScheduler;
typedef struct SMnodeMsg { typedef struct SMnodeMsg {
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_SCHEDULER_H_
#define _TD_MND_SCHEDULER_H_
#include "mndInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t mndInitScheduler(SMnode* pMnode);
void mndCleanupScheduler(SMnode* pMnode);
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_SCHEDULER_H_ */
...@@ -340,9 +340,16 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { ...@@ -340,9 +340,16 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
pVgroup->compStorage = pVload->compStorage; pVgroup->compStorage = pVload->compStorage;
pVgroup->pointsWritten = pVload->pointsWritten; pVgroup->pointsWritten = pVload->pointsWritten;
} }
bool roleChanged = false;
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
if (pVgroup->vnodeGid[vg].role != pVload->role) {
roleChanged = true;
}
pVgroup->vnodeGid[vg].role = pVload->role; pVgroup->vnodeGid[vg].role = pVload->role;
} }
if (roleChanged) {
// notify scheduler role has changed
}
} }
mndReleaseVgroup(pMnode, pVgroup); mndReleaseVgroup(pMnode, pVgroup);
...@@ -631,13 +638,13 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp ...@@ -631,13 +638,13 @@ static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp
pShow->bytes[cols] = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE; pShow->bytes[cols] = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name"); strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = pShow->bytes[cols]; pSchema[cols].bytes = pShow->bytes[cols];
cols++; cols++;
pShow->bytes[cols] = TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE; pShow->bytes[cols] = TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY; pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "value"); strcpy(pSchema[cols].name, "value");
pSchema[cols].bytes = pShow->bytes[cols]; pSchema[cols].bytes = pShow->bytes[cols];
cols++; cols++;
...@@ -823,4 +830,4 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, i ...@@ -823,4 +830,4 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pReq, SShowObj *pShow, char *data, i
static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) { static void mndCancelGetNextDnode(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
} }
\ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "mndScheduler.h"
#include "mndConsumer.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndOffset.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndSubscribe.h"
#include "mndTopic.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "tcompare.h"
#include "tname.h"
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub) {
SSdb* pSdb = pMnode->pSdb;
SVgObj* pVgroup = NULL;
SQueryDag* pDag = qStringToDag(pTopic->physicalPlan);
SArray* pAray = NULL;
SArray* unassignedVg = pSub->unassignedVg;
ASSERT(pSub->vgNum == 0);
int32_t levelNum = taosArrayGetSize(pDag->pSubplans);
if (levelNum != 1) {
return -1;
}
SArray* inner = taosArrayGet(pDag->pSubplans, 0);
SSubplan* plan = taosArrayGetP(inner, 0);
void* pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
if (pIter == NULL) break;
if (pVgroup->dbUid != pTopic->dbUid) {
sdbRelease(pSdb, pVgroup);
continue;
}
pSub->vgNum++;
plan->execNode.nodeId = pVgroup->vgId;
plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup);
SMqConsumerEp consumerEp = {0};
consumerEp.status = 0;
consumerEp.consumerId = -1;
consumerEp.epSet = plan->execNode.epset;
consumerEp.vgId = plan->execNode.nodeId;
int32_t msgLen;
int32_t code = qSubPlanToString(plan, &consumerEp.qmsg, &msgLen);
taosArrayPush(unassignedVg, &consumerEp);
}
return 0;
}
...@@ -20,6 +20,7 @@ ...@@ -20,6 +20,7 @@
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
#include "mndOffset.h" #include "mndOffset.h"
#include "mndScheduler.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndTopic.h" #include "mndTopic.h"
...@@ -97,12 +98,21 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj ...@@ -97,12 +98,21 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
strcpy(pSub->key, key); strcpy(pSub->key, key);
free(key); free(key);
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
tDeleteSMqSubscribeObj(pSub);
free(pSub);
return NULL;
}
#if 0
if (mndInitUnassignedVg(pMnode, pTopic, pSub) < 0) { if (mndInitUnassignedVg(pMnode, pTopic, pSub) < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tDeleteSMqSubscribeObj(pSub); tDeleteSMqSubscribeObj(pSub);
free(pSub); free(pSub);
return NULL; return NULL;
} }
#endif
// TODO: disable alter subscribed table // TODO: disable alter subscribed table
return pSub; return pSub;
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
SSnode *pSnode = calloc(1, sizeof(SSnode)); SSnode *pSnode = calloc(1, sizeof(SSnode));
memcpy(&pSnode->cfg, pOption, sizeof(SSnodeOpt));
return pSnode; return pSnode;
} }
...@@ -29,4 +30,4 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -29,4 +30,4 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
return 0; return 0;
} }
void sndDestroy(const char *path) {} void sndDestroy(const char *path) {}
\ No newline at end of file
...@@ -376,7 +376,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { ...@@ -376,7 +376,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
pTopic->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta);
SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta}; SReadHandle handle = {
.reader = pReadHandle,
.meta = pTq->pVnodeMeta,
};
pTopic->buffer.output[i].pReadHandle = pReadHandle; pTopic->buffer.output[i].pReadHandle = pReadHandle;
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, &handle);
} }
......
...@@ -28,6 +28,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { ...@@ -28,6 +28,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
pReadHandle->sver = -1; pReadHandle->sver = -1;
pReadHandle->pSchema = NULL; pReadHandle->pSchema = NULL;
pReadHandle->pSchemaWrapper = NULL; pReadHandle->pSchemaWrapper = NULL;
pReadHandle->tbIdHash = NULL;
return pReadHandle; return pReadHandle;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册