提交 b22986ce 编写于 作者: C Cary Xu

feat: support grant

上级 b2f7d70b
...@@ -139,6 +139,7 @@ extern int32_t tsTransPullupInterval; ...@@ -139,6 +139,7 @@ extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval; extern int32_t tsMqRebalanceInterval;
extern int32_t tsTtlUnit; extern int32_t tsTtlUnit;
extern int32_t tsTtlPushInterval; extern int32_t tsTtlPushInterval;
extern int32_t tsGrantHBInterval;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
......
...@@ -149,6 +149,7 @@ enum { ...@@ -149,6 +149,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_TELEM_TIMER, "telem-tmr", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TRANS_TIMER, "trans-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_TTL_TIMER, "ttl-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT_HB_TIMER, "grant-hb-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_TRANS, "kill-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_QUERY, "kill-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_CONN, "kill-conn", NULL, NULL)
......
...@@ -189,6 +189,7 @@ int32_t tsTransPullupInterval = 2; ...@@ -189,6 +189,7 @@ int32_t tsTransPullupInterval = 2;
int32_t tsMqRebalanceInterval = 2; int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 60; int32_t tsTtlPushInterval = 60;
int32_t tsGrantHBInterval = 1;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) { void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
......
...@@ -46,6 +46,7 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); ...@@ -46,6 +46,7 @@ int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessGrantRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessServerRunStatus(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessGrantReq(SRpcMsg *pMsg);
// dmWorker.c // dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
......
...@@ -331,7 +331,8 @@ SArray *dmGetMsgHandles() { ...@@ -331,7 +331,8 @@ SArray *dmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_SYSTABLE_RETRIEVE, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
// Requests handled by MNODE // Requests handled by MNODE
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
// if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH_RSP, dmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
code = 0; code = 0;
......
...@@ -144,6 +144,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { ...@@ -144,6 +144,9 @@ static void dmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) {
case TDMT_DND_SYSTABLE_RETRIEVE: case TDMT_DND_SYSTABLE_RETRIEVE:
code = dmProcessRetrieve(pMgmt, pMsg); code = dmProcessRetrieve(pMgmt, pMsg);
break; break;
case TDMT_MND_GRANT:
code = dmProcessGrantReq(pMsg);
break;
default: default:
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
break; break;
......
...@@ -204,7 +204,7 @@ SArray *mmGetMsgHandles() { ...@@ -204,7 +204,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; // if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_AUTH, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SHOW_VARIABLES, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_SERVER_VERSION, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
......
...@@ -29,6 +29,7 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode); ...@@ -29,6 +29,7 @@ void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode);
SEpSet mndGetDnodeEpset(SDnodeObj *pDnode); SEpSet mndGetDnodeEpset(SDnodeObj *pDnode);
int32_t mndGetDnodeSize(SMnode *pMnode); int32_t mndGetDnodeSize(SMnode *pMnode);
bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs); bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs);
void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -262,7 +262,7 @@ bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) { ...@@ -262,7 +262,7 @@ bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) {
return true; return true;
} }
static void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) { void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfEps = 0; int32_t numOfEps = 0;
......
...@@ -118,8 +118,11 @@ static int32_t mndRetrieveGrant(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl ...@@ -118,8 +118,11 @@ static int32_t mndRetrieveGrant(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
return numOfRows; return numOfRows;
} }
static int32_t mndProcessGrantHB(SRpcMsg *pReq) { return TSDB_CODE_SUCCESS; }
int32_t mndInitGrant(SMnode *pMnode) { int32_t mndInitGrant(SMnode *pMnode) {
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_GRANTS, mndRetrieveGrant); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_GRANTS, mndRetrieveGrant);
mndSetMsgHandle(pMnode, TDMT_MND_GRANT_HB_TIMER, mndProcessGrantHB);
return 0; return 0;
} }
...@@ -129,6 +132,7 @@ int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; } ...@@ -129,6 +132,7 @@ int32_t grantCheck(EGrantType grant) { return TSDB_CODE_SUCCESS; }
void grantReset(EGrantType grant, uint64_t value) {} void grantReset(EGrantType grant, uint64_t value) {}
void grantAdd(EGrantType grant, uint64_t value) {} void grantAdd(EGrantType grant, uint64_t value) {}
void grantRestore(EGrantType grant, uint64_t value) {} void grantRestore(EGrantType grant, uint64_t value) {}
int32_t dmProcessGrantReq(SRpcMsg *pMsg) { return TSDB_CODE_SUCCESS; }
#endif #endif
......
...@@ -90,6 +90,16 @@ static void mndPullupTelem(SMnode *pMnode) { ...@@ -90,6 +90,16 @@ static void mndPullupTelem(SMnode *pMnode) {
} }
} }
static void mndGrantHeartBeat(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {
.msgType = TDMT_MND_GRANT_HB_TIMER, .pCont = pReq, .contLen = contLen, .info.ahandle = (void *)0x9527};
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
}
static void *mndThreadFp(void *param) { static void *mndThreadFp(void *param) {
SMnode *pMnode = param; SMnode *pMnode = param;
int64_t lastTime = 0; int64_t lastTime = 0;
...@@ -115,6 +125,10 @@ static void *mndThreadFp(void *param) { ...@@ -115,6 +125,10 @@ static void *mndThreadFp(void *param) {
if (lastTime % (tsTelemInterval * 10) == 0) { if (lastTime % (tsTelemInterval * 10) == 0) {
mndPullupTelem(pMnode); mndPullupTelem(pMnode);
} }
if (lastTime % (tsGrantHBInterval * 10) == 0) {
mndGrantHeartBeat(pMnode);
}
} }
return NULL; return NULL;
......
...@@ -271,7 +271,7 @@ void ctgFreeHandle(SCatalog* pCtg) { ...@@ -271,7 +271,7 @@ void ctgFreeHandle(SCatalog* pCtg) {
taosMemoryFree(pCtg); taosMemoryFree(pCtg);
ctgInfo("handle freed, culsterId:0x%" PRIx64, clusterId); ctgInfo("handle freed, clusterId:0x%" PRIx64, clusterId);
} }
void ctgClearHandle(SCatalog* pCtg) { void ctgClearHandle(SCatalog* pCtg) {
...@@ -302,7 +302,7 @@ void ctgClearHandle(SCatalog* pCtg) { ...@@ -302,7 +302,7 @@ void ctgClearHandle(SCatalog* pCtg) {
CTG_CACHE_STAT_INC(numOfClear, 1); CTG_CACHE_STAT_INC(numOfClear, 1);
ctgInfo("handle cleared, culsterId:0x%" PRIx64, clusterId); ctgInfo("handle cleared, clusterId:0x%" PRIx64, clusterId);
} }
void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) { void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册