提交 5ab6166e 编写于 作者: D dapan1121

enh: support grant check in vnode query

上级 dbfc2d3f
......@@ -216,14 +216,9 @@ typedef struct SEp {
uint16_t port;
} SEp;
#define SHOW_REWRITE_MASK() (1 << 0)
#define TEST_SHOW_REWRITE_MASK(m) (((m) & SHOW_REWRITE_MASK()) != 0)
typedef struct {
int32_t contLen;
int32_t vgId;
int32_t msgMask;
} SMsgHead;
// Submit message for one table
......@@ -1616,6 +1611,7 @@ typedef struct SSubQueryMsg {
int8_t needFetch;
uint32_t sqlLen; // the query sql,
uint32_t phyLen;
int32_t msgMask;
char msg[];
} SSubQueryMsg;
......
......@@ -57,6 +57,10 @@ typedef enum {
#define QUERY_RSP_POLICY_DELAY 0
#define QUERY_RSP_POLICY_QUICK 1
#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0)
#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0)
typedef struct STableComInfo {
uint8_t numOfTags; // the number of tags in schema
uint8_t precision; // the number of precision
......
......@@ -76,7 +76,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S
int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGrant);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts);
......
......@@ -145,7 +145,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
pHead->contLen = ntohl(pHead->contLen);
pHead->vgId = ntohl(pHead->vgId);
pHead->msgMask = ntohl(pHead->msgMask);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
if (pVnode == NULL) {
......@@ -156,15 +155,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
switch (qtype) {
case QUERY_QUEUE:
if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) && !TEST_SHOW_REWRITE_MASK(pHead->msgMask)) {
terrno = TSDB_CODE_GRANT_EXPIRED;
code = terrno;
dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr(code));
} else {
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pQueryQ, pMsg);
}
vnodePreprocessQueryMsg(pVnode->pImpl, pMsg);
dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg);
taosWriteQitem(pVnode->pQueryQ, pMsg);
break;
case STREAM_QUEUE:
dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg);
......
......@@ -21,7 +21,7 @@
int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) {
if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return 0;
SMnode *pMnode = pMsg->info.node;
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg);
return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg, false);
}
void mndPostProcessQueryMsg(SRpcMsg *pMsg) {
......
......@@ -519,7 +519,6 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb,
pHead->contLen = htonl(contLen);
pHead->vgId = htonl(pVgroup->vgId);
pHead->msgMask = htonl(0);
void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead));
......
......@@ -69,7 +69,7 @@ int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) {
return 0;
}
return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg);
return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg, false);
}
int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
......
......@@ -329,7 +329,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return 0;
}
return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg);
return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType);
}
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
......
......@@ -8,6 +8,7 @@
#include "tcommon.h"
#include "tmsg.h"
#include "tname.h"
#include "tgrant.h"
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) {
int32_t msgSize = sizeof(SRetrieveTableRsp) + length;
......@@ -305,7 +306,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGrant) {
if (NULL == qWorkerMgmt || NULL == pMsg) {
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
......@@ -326,6 +327,12 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) {
msg->execId = ntohl(msg->execId);
msg->phyLen = ntohl(msg->phyLen);
msg->sqlLen = ntohl(msg->sqlLen);
msg->msgMask = ntohl(msg->msgMask);
if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg->msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) {
QW_ELOG("query failed cause of grant expired, msgMask:%d", msg->msgMask);
QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED);
}
uint64_t sId = msg->sId;
uint64_t qId = msg->queryId;
......
......@@ -1047,7 +1047,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SSubQueryMsg *pMsg = msg;
pMsg->header.vgId = htonl(addr->nodeId);
pMsg->header.msgMask = htonl((pTask->plan->showRewrite) ? SHOW_REWRITE_MASK() : 0);
pMsg->sId = htobe64(schMgmt.sId);
pMsg->queryId = htobe64(pJob->queryId);
pMsg->taskId = htobe64(pTask->taskId);
......@@ -1058,6 +1057,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
pMsg->needFetch = SCH_TASK_NEED_FETCH(pTask);
pMsg->phyLen = htonl(pTask->msgLen);
pMsg->sqlLen = htonl(len);
pMsg->msgMask = htonl((pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0);
memcpy(pMsg->msg, pJob->sql, len);
memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen);
......
......@@ -72,7 +72,6 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId)
SMsgHead head;
head.vgId = vgId;
head.contLen = sizeof(SMsgHead);
head.msgMask = 0;
SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(SRpcMsg));
rpcMsg.contLen = head.contLen;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册