提交 220fdfab 编写于 作者: S Shengliang Guan

refact dnode queue

上级 35f16b20
......@@ -2319,14 +2319,6 @@ typedef struct {
#pragma pack(pop)
struct SRpcMsg;
struct SEpSet;
struct SMgmtWrapper;
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_COMMON_MSG_CB_H_
#define _TD_COMMON_MSG_CB_H_
#include "os.h"
#ifdef __cplusplus
extern "C" {
#endif
struct SRpcMsg;
struct SEpSet;
struct SMgmtWrapper;
typedef struct SMgmtWrapper SMgmtWrapper;
typedef int32_t (*PutToQueueFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef int32_t (*SendReqFp)(struct SMgmtWrapper* pWrapper, struct SEpSet* epSet, struct SRpcMsg* pReq);
typedef int32_t (*SendMnodeReqFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pReq);
typedef void (*SendRspFp)(struct SMgmtWrapper* pWrapper, struct SRpcMsg* pRsp);
typedef enum { QUERY_QUEUE, FETCH_QUEUE, WRITE_QUEUE, APPLY_QUEUE, SYNC_QUEUE, QUEUE_MAX } EMsgQueueType;
typedef struct {
struct SMgmtWrapper* pWrapper;
PutToQueueFp queueFps[QUEUE_MAX];
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SMsgCb;
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq);
int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq);
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq);
void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp);
#ifdef __cplusplus
}
#endif
#endif /*_TD_COMMON_MSG_CB_H_*/
......@@ -16,22 +16,20 @@
#ifndef _TD_BNODE_H_
#define _TD_BNODE_H_
#include "tmsgcb.h"
#ifdef __cplusplus
extern "C" {
#endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SBnode SBnode;
typedef struct SBnode SBnode;
typedef struct {
} SBnodeLoad;
typedef struct {
SMgmtWrapper *pWrapper;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
SMsgCb msgCb;
} SBnodeOpt;
/* ------------------------ SBnode ------------------------ */
......
......@@ -17,27 +17,22 @@
#define _TD_MND_H_
#include "monitor.h"
#include "tmsgcb.h"
#ifdef __cplusplus
extern "C" {
#endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SMnode SMnode;
typedef struct SMnode SMnode;
typedef struct {
int32_t dnodeId;
int64_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
SMgmtWrapper *pWrapper;
PutToQueueFp putToWriteQFp;
PutToQueueFp putToReadQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
int32_t dnodeId;
int64_t clusterId;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
SMsgCb msgCb;
} SMnodeOpt;
/* ------------------------ SMnode ------------------------ */
......
......@@ -16,13 +16,14 @@
#ifndef _TD_QNODE_H_
#define _TD_QNODE_H_
#include "tmsgcb.h"
#ifdef __cplusplus
extern "C" {
#endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SQnode SQnode;
typedef struct SQnode SQnode;
typedef struct {
int64_t numOfStartTask;
......@@ -36,10 +37,7 @@ typedef struct {
} SQnodeLoad;
typedef struct {
SMgmtWrapper *pWrapper;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
SMsgCb msgCb;
} SQnodeOpt;
/* ------------------------ SQnode ------------------------ */
......
......@@ -16,7 +16,7 @@
#ifndef _TD_SNODE_H_
#define _TD_SNODE_H_
#include "tcommon.h"
#include "tmsgcb.h"
#include "tmsg.h"
#include "trpc.h"
......@@ -25,17 +25,13 @@ extern "C" {
#endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SSnode SSnode;
typedef struct SSnode SSnode;
typedef struct {
} SSnodeLoad;
typedef struct {
SMgmtWrapper *pWrapper;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
SMsgCb msgCb;
} SSnodeOpt;
/* ------------------------ SSnode ------------------------ */
......
......@@ -20,6 +20,7 @@
extern "C" {
#endif
#include "tmsgcb.h"
#include "trpc.h"
......@@ -48,11 +49,7 @@ typedef struct {
uint64_t numOfErrors;
} SQWorkerStat;
typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *);
typedef int32_t (*sendReqFp)(void *, struct SEpSet *, struct SRpcMsg *);
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj,
putReqToQueryQFp fp1, sendReqFp fp2);
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "tmsgcb.h"
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EMsgQueueType qtype, struct SRpcMsg* pReq) {
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
}
int32_t tmsgSendReq(const SMsgCb* pMsgCb, struct SEpSet* epSet, struct SRpcMsg* pReq) {
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
}
int32_t tmsgSendMnodeReq(const SMsgCb* pMsgCb, struct SRpcMsg* pReq) {
return (*pMsgCb->sendMnodeReqFp)(pMsgCb->pWrapper, pReq);
}
void tmsgSendRsp(const SMsgCb* pMsgCb, struct SRpcMsg* pRsp) { return (*pMsgCb->sendRspFp)(pMsgCb->pWrapper, pRsp); }
......@@ -30,7 +30,7 @@ extern "C" {
#endif
typedef struct SBnode {
SBnodeOpt opt;
SMsgCb msgCb;
} SBnode;
#ifdef __cplusplus
......
......@@ -17,6 +17,7 @@
SBnode *bndOpen(const char *path, const SBnodeOpt *pOption) {
SBnode *pBnode = calloc(1, sizeof(SBnode));
pBnode->msgCb = pOption->msgCb;
return pBnode;
}
......
......@@ -19,11 +19,12 @@
static int32_t bmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void bmInitOption(SBnodeMgmt *pMgmt, SBnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp;
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
pOption->msgCb = msgCb;
}
static int32_t bmOpenImp(SBnodeMgmt *pMgmt) {
......
......@@ -39,14 +39,17 @@ static int32_t mmRequire(SMgmtWrapper *pWrapper, bool *required) {
static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper;
pOption->putToWriteQFp = mmPutMsgToWriteQueue;
pOption->putToReadQFp = mmPutMsgToReadQueue;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp;
pOption->dnodeId = pDnode->dnodeId;
pOption->clusterId = pDnode->clusterId;
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = mmPutMsgToReadQueue;
msgCb.queueFps[WRITE_QUEUE] = mmPutMsgToWriteQueue;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
pOption->msgCb = msgCb;
}
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
......
......@@ -19,11 +19,12 @@
static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp;
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
pOption->msgCb = msgCb;
}
static int32_t qmOpenImp(SQnodeMgmt *pMgmt) {
......
......@@ -19,11 +19,12 @@
static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); }
static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
pOption->pWrapper = pMgmt->pWrapper;
pOption->sendReqFp = dndSendReqToDnode;
pOption->sendMnodeReqFp = dndSendReqToMnode;
pOption->sendRspFp = dndSendRsp;
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
pOption->msgCb = msgCb;
}
static int32_t smOpenImp(SSnodeMgmt *pMgmt) {
......
......@@ -128,7 +128,13 @@ static void *vmOpenVnodeFunc(void *param) {
pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pWrapper = pMgmt->pWrapper, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
SVnodeCfg cfg = {.msgCb = msgCb, .pTfs = pMgmt->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
......@@ -262,7 +268,6 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
SVnodesMgmt *pMgmt = calloc(1, sizeof(SVnodesMgmt));
int32_t code = -1;
SVnodeOpt vnodeOpt = {0};
dInfo("vnodes-mgmt start to init");
if (pMgmt == NULL) goto _OVER;
......@@ -294,13 +299,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
goto _OVER;
}
vnodeOpt.nthreads = tsNumOfCommitThreads;
vnodeOpt.putToQueryQFp = vmPutMsgToQueryQueue;
vnodeOpt.putToFetchQFp = vmPutMsgToQueryQueue;
vnodeOpt.sendReqFp = dndSendReqToDnode;
vnodeOpt.sendMnodeReqFp = dndSendReqToMnode;
vnodeOpt.sendRspFp = dndSendRsp;
if (vnodeInit(&vnodeOpt) != 0) {
if (vnodeInit() != 0) {
dError("failed to init vnode since %s", terrstr());
goto _OVER;
}
......
......@@ -82,7 +82,14 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) {
return -1;
}
vnodeCfg.pWrapper = pMgmt->pWrapper;
SMsgCb msgCb = {0};
msgCb.pWrapper = pMgmt->pWrapper;
msgCb.queueFps[QUERY_QUEUE] = vmPutMsgToQueryQueue;
msgCb.sendReqFp = dndSendReqToDnode;
msgCb.sendMnodeReqFp = dndSendReqToMnode;
msgCb.sendRspFp = dndSendRsp;
vnodeCfg.msgCb = msgCb;
vnodeCfg.pTfs = pMgmt->pTfs;
vnodeCfg.dbId = wrapperCfg.dbUid;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
......
......@@ -119,19 +119,12 @@ typedef struct SMnode {
SHashObj *infosMeta;
SGrantInfo grant;
MndMsgFp msgFp[TDMT_MAX];
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
PutToQueueFp putToWriteQFp;
PutToQueueFp putToReadQFp;
SMsgCb msgCb;
} SMnode;
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg);
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg);
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp);
uint64_t mndGenerateUid(char *name, int32_t len);
void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
void mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
#ifdef __cplusplus
}
......
......@@ -623,7 +623,7 @@ static int32_t mndProcessConfigDnodeReq(SNodeMsg *pReq) {
.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen, .ahandle = pReq->rpcMsg.ahandle};
mInfo("dnode:%d, app:%p config:%s req send to dnode", cfgReq.dnodeId, rpcMsg.ahandle, cfgReq.config);
mndSendReqToDnode(pMnode, &epSet, &rpcMsg);
tmsgSendReq(&pMnode->msgCb, &epSet, &rpcMsg);
return 0;
}
......
......@@ -420,7 +420,7 @@ static int32_t mndProcessMqTimerMsg(SNodeMsg *pMsg) {
.pCont = pRebMsg,
.contLen = sizeof(SMqDoRebalanceMsg),
};
(*pMnode->putToWriteQFp)(pMnode->pWrapper, &rpcMsg);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
} else {
taosHashCleanup(pRebMsg->rebSubHash);
rpcFreeCont(pRebMsg);
......
......@@ -888,7 +888,7 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
}
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
if (mndSendReqToDnode(pMnode, &pAction->epSet, &rpcMsg) == 0) {
if (tmsgSendReq(&pMnode->msgCb, &pAction->epSet, &rpcMsg) == 0) {
mDebug("trans:%d, action:%d is sent", pTrans->id, action);
pAction->msgSent = 1;
pAction->msgReceived = 0;
......
......@@ -43,24 +43,6 @@
#define TRNAS_TIMER_MS 6000
#define TELEM_TIMER_MS 86400000
int32_t mndSendReqToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *pMsg) {
if (pMnode == NULL || pMnode->sendReqFp == NULL) {
terrno = TSDB_CODE_MND_NOT_READY;
return -1;
}
return (*pMnode->sendReqFp)(pMnode->pWrapper, pEpSet, pMsg);
}
int32_t mndSendReqToMnode(SMnode *pMnode, SRpcMsg *pMsg) {
if (pMnode == NULL || pMnode->sendReqFp == NULL) {
terrno = TSDB_CODE_MND_NOT_READY;
return -1;
}
return (*pMnode->sendMnodeReqFp)(pMnode->pWrapper, pMsg);
}
static void *mndBuildTimerMsg(int32_t *pContLen) {
SMTimerReq timerReq = {0};
......@@ -80,7 +62,7 @@ static void mndPullupTrans(void *param, void *tmrId) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
pMnode->putToWriteQFp(pMnode->pWrapper, &rpcMsg);
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
......@@ -96,7 +78,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
.pCont = pReq,
.contLen = contLen,
};
pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg);
tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg);
}
taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
......@@ -108,7 +90,7 @@ static void mndPullupTelem(void *param, void *tmrId) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
pMnode->putToReadQFp(pMnode->pWrapper, &rpcMsg);
tmsgPutToQueue(&pMnode->msgCb, QUERY_QUEUE, &rpcMsg);
}
taosTmrReset(mndPullupTelem, TELEM_TIMER_MS, pMnode, pMnode->timer, &pMnode->telemTimer);
......@@ -286,14 +268,9 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->replica = pOption->replica;
pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->pWrapper = pOption->pWrapper;
pMnode->putToWriteQFp = pOption->putToWriteQFp;
pMnode->putToReadQFp = pOption->putToReadQFp;
pMnode->sendReqFp = pOption->sendReqFp;
pMnode->sendMnodeReqFp = pOption->sendMnodeReqFp;
if (pMnode->sendReqFp == NULL || pMnode->sendMnodeReqFp == NULL ||
pMnode->putToWriteQFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
pMnode->msgCb = pOption->msgCb;
if (pMnode->dnodeId < 0 || pMnode->clusterId < 0) {
terrno = TSDB_CODE_MND_INVALID_OPTIONS;
return -1;
}
......
......@@ -32,8 +32,8 @@ typedef struct SQWorkerMgmt SQHandle;
typedef struct SQnode {
int32_t qndId;
SQnodeOpt opt;
SQHandle* pQuery;
SMsgCb msgCb;
SQHandle* pQuery;
} SQnode;
#ifdef __cplusplus
......
......@@ -13,14 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "qndInt.h"
#include "query.h"
#include "qworker.h"
#include "executor.h"
int32_t qnodePutReqToVQueryQ(SQnode* pQnode, struct SRpcMsg* pReq) {}
void qnodeSendReqToDnode(SQnode* pQnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {}
SQnode *qndOpen(const SQnodeOpt *pOption) {
SQnode *pQnode = calloc(1, sizeof(SQnode));
......@@ -29,12 +25,12 @@ SQnode *qndOpen(const SQnodeOpt *pOption) {
return NULL;
}
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, pQnode,
(putReqToQueryQFp)qnodePutReqToVQueryQ, (sendReqFp)qnodeSendReqToDnode)) {
if (qWorkerInit(NODE_TYPE_QNODE, pQnode->qndId, NULL, (void **)&pQnode->pQuery, &pOption->msgCb)) {
tfree(pQnode);
return NULL;
}
pQnode->msgCb = pOption->msgCb;
return pQnode;
}
......
......@@ -44,7 +44,7 @@ typedef struct {
typedef struct SSnode {
SStreamMeta* pMeta;
SSnodeOpt cfg;
SMsgCb msgCb;
} SSnode;
SStreamMeta* sndMetaNew();
......
......@@ -22,7 +22,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
if (pSnode == NULL) {
return NULL;
}
memcpy(&pSnode->cfg, pOption, sizeof(SSnodeOpt));
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = sndMetaNew();
if (pSnode->pMeta == NULL) {
free(pSnode);
......
......@@ -18,6 +18,7 @@
#include "os.h"
#include "trpc.h"
#include "tmsgcb.h"
#include "meta.h"
#include "tarray.h"
......@@ -40,7 +41,6 @@ typedef struct {
typedef struct {
int32_t vgId;
uint64_t dbId;
void *pWrapper;
STfs *pTfs;
uint64_t wsize;
uint64_t ssize;
......@@ -54,20 +54,12 @@ typedef struct {
SMetaCfg metaCfg;
STqCfg tqCfg;
SWalCfg walCfg;
SMsgCb msgCb;
uint32_t hashBegin;
uint32_t hashEnd;
int8_t hashMethod;
} SVnodeCfg;
typedef struct {
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutToQueueFp putToQueryQFp;
PutToQueueFp putToFetchQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SVnodeOpt;
typedef struct {
int64_t ver;
int64_t tbUid;
......@@ -87,10 +79,9 @@ typedef struct {
/**
* @brief Initialize the vnode module
*
* @param pOption Option of the vnode mnodule
* @return int 0 for success and -1 for failure
*/
int vnodeInit(const SVnodeOpt *pOption);
int vnodeInit();
/**
* @brief Cleanup the vnode module
......
......@@ -52,12 +52,6 @@ typedef struct SVnodeMgr {
TdThreadMutex mutex;
TdThreadCond hasTask;
TD_DLIST(SVnodeTask) queue;
// For vnode Mgmt
PutToQueueFp putToQueryQFp;
PutToQueueFp putToFetchQFp;
SendReqFp sendReqFp;
SendMnodeReqFp sendMnodeReqFp;
SendRspFp sendRspFp;
} SVnodeMgr;
extern SVnodeMgr vnodeMgr;
......@@ -81,7 +75,7 @@ struct SVnode {
SWal* pWal;
tsem_t canCommit;
SQHandle* pQuery;
void* pWrapper;
SMsgCb msgCb;
STfs* pTfs;
};
......
......@@ -27,7 +27,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) {
SVnodeCfg cfg = defaultVnodeOptions;
if (pVnodeCfg != NULL) {
cfg.vgId = pVnodeCfg->vgId;
cfg.pWrapper = pVnodeCfg->pWrapper;
cfg.msgCb = pVnodeCfg->msgCb;
cfg.pTfs = pVnodeCfg->pTfs;
cfg.dbId = pVnodeCfg->dbId;
cfg.hashBegin = pVnodeCfg->hashBegin;
......@@ -79,7 +79,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
}
pVnode->vgId = pVnodeCfg->vgId;
pVnode->pWrapper = pVnodeCfg->pWrapper;
pVnode->msgCb = pVnodeCfg->msgCb;
pVnode->pTfs = pVnodeCfg->pTfs;
pVnode->path = strdup(path);
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
......
......@@ -14,43 +14,33 @@
*/
#include "vnd.h"
#include "tglobal.h"
SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED};
static void* loop(void* arg);
int vnodeInit(const SVnodeOpt *pOption) {
int vnodeInit() {
if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) {
return 0;
}
vnodeMgr.stop = false;
vnodeMgr.putToQueryQFp = pOption->putToQueryQFp;
vnodeMgr.putToFetchQFp = pOption->putToFetchQFp;
vnodeMgr.sendReqFp = pOption->sendReqFp;
vnodeMgr.sendMnodeReqFp = pOption->sendMnodeReqFp;
vnodeMgr.sendRspFp = pOption->sendRspFp;
// Start commit handers
if (pOption->nthreads > 0) {
vnodeMgr.nthreads = pOption->nthreads;
vnodeMgr.threads = (TdThread*)calloc(pOption->nthreads, sizeof(TdThread));
if (vnodeMgr.threads == NULL) {
return -1;
}
vnodeMgr.nthreads = tsNumOfCommitThreads;
vnodeMgr.threads = calloc(vnodeMgr.nthreads, sizeof(TdThread));
if (vnodeMgr.threads == NULL) {
return -1;
}
taosThreadMutexInit(&(vnodeMgr.mutex), NULL);
taosThreadCondInit(&(vnodeMgr.hasTask), NULL);
TD_DLIST_INIT(&(vnodeMgr.queue));
taosThreadMutexInit(&(vnodeMgr.mutex), NULL);
taosThreadCondInit(&(vnodeMgr.hasTask), NULL);
TD_DLIST_INIT(&(vnodeMgr.queue));
for (uint16_t i = 0; i < pOption->nthreads; i++) {
taosThreadCreate(&(vnodeMgr.threads[i]), NULL, loop, NULL);
// pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
}
} else {
// TODO: if no commit thread is set, then another mechanism should be
// given. Otherwise, it is a false.
ASSERT(0);
for (uint16_t i = 0; i < vnodeMgr.nthreads; i++) {
taosThreadCreate(&(vnodeMgr.threads[i]), NULL, loop, NULL);
// pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
}
if (walInit() < 0) {
......@@ -92,26 +82,6 @@ int vnodeScheduleTask(SVnodeTask* pTask) {
return 0;
}
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.putToQueryQFp)(pVnode->pWrapper, pReq);
}
int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.putToFetchQFp)(pVnode->pWrapper, pReq);
}
int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
return (*vnodeMgr.sendReqFp)(pVnode->pWrapper, epSet, pReq);
}
int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.sendMnodeReqFp)(pVnode->pWrapper, pReq);
}
void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp) {
(*vnodeMgr.sendRspFp)(pVnode->pWrapper, pRsp);
}
/* ------------------------ STATIC METHODS ------------------------ */
static void* loop(void* arg) {
setThreadName("vnode-commit");
......
......@@ -20,8 +20,7 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode,
(putReqToQueryQFp)vnodePutToVQueryQ, (sendReqFp)vnodeSendReq);
return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
}
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
......
......@@ -20,6 +20,7 @@
extern "C" {
#endif
#include "qworker.h"
#include "tlockfree.h"
#include "ttimer.h"
......@@ -139,18 +140,16 @@ typedef struct SQWSchStatus {
// Qnode/Vnode level task management
typedef struct SQWorkerMgmt {
SQWorkerCfg cfg;
int8_t nodeType;
int32_t nodeId;
void *timer;
tmr_h hbTimer;
SRWLatch schLock;
//SRWLatch ctxLock;
SHashObj *schHash; //key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
void *nodeObj;
putReqToQueryQFp putToQueueFp;
sendReqFp sendReqFp;
SQWorkerCfg cfg;
int8_t nodeType;
int32_t nodeId;
void *timer;
tmr_h hbTimer;
SRWLatch schLock;
// SRWLatch ctxLock;
SHashObj *schHash; // key: schedulerId, value: SQWSchStatus
SHashObj *ctxHash; // key: queryId+taskId, value: SQWTaskCtx
SMsgCb msgCb;
} SQWorkerMgmt;
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId
......
......@@ -1442,9 +1442,8 @@ _return:
taosTmrReset(qwProcessHbTimerEvent, QW_DEFAULT_HEARTBEAT_MSEC, param, mgmt->timer, &mgmt->hbTimer);
}
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj,
putReqToQueryQFp fp1, sendReqFp fp2) {
if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp1 || NULL == fp2) {
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, const SMsgCb *pMsgCb) {
if (NULL == qWorkerMgmt || pMsgCb->pWrapper == NULL) {
qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -1500,9 +1499,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
mgmt->nodeType = nodeType;
mgmt->nodeId = nodeId;
mgmt->nodeObj = nodeObj;
mgmt->putToQueueFp = fp1;
mgmt->sendReqFp = fp2;
mgmt->msgCb = *pMsgCb;
*qWorkerMgmt = mgmt;
......
......@@ -245,15 +245,15 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, void *connection) {
req->taskId = tId;
SRpcMsg pNewMsg = {
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
.handle = pMsg->handle,
.ahandle = pMsg->ahandle,
.msgType = TDMT_VND_QUERY_CONTINUE,
.pCont = req,
.contLen = sizeof(SQueryContinueReq),
.code = 0,
};
int32_t code = (*mgmt->putToQueueFp)(mgmt->nodeObj, &pNewMsg);
int32_t code = tmsgPutToQueue(&mgmt->msgCb, QUERY_QUEUE, &pNewMsg);
if (TSDB_CODE_SUCCESS != code) {
QW_SCH_TASK_ELOG("put query continue msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code));
rpcFreeCont(req);
......
......@@ -1080,7 +1080,10 @@ TEST(rcTest, shortExecshortDelay) {
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0;
......@@ -1161,7 +1164,10 @@ TEST(rcTest, longExecshortDelay) {
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 1000000;
......@@ -1244,7 +1250,10 @@ TEST(rcTest, shortExeclongDelay) {
qwtTestStop = false;
qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
SMsgCb msgCb = {0};
msgCb.pWrapper = (struct SMgmtWrapper *)mockPointer;
msgCb.queueFps[QUERY_QUEUE] = (PutToQueueFp)qwtPutReqToQueue;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册