diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c0ac7da5bf502bbb7408b64b0d67c66954981ce2..8aa85fa6b4f9a131f74d701ac15657dd620c12ec 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -216,14 +216,9 @@ typedef struct SEp { uint16_t port; } SEp; -#define SHOW_REWRITE_MASK() (1 << 0) - -#define TEST_SHOW_REWRITE_MASK(m) (((m) & SHOW_REWRITE_MASK()) != 0) - typedef struct { int32_t contLen; int32_t vgId; - int32_t msgMask; } SMsgHead; // Submit message for one table @@ -1616,6 +1611,7 @@ typedef struct SSubQueryMsg { int8_t needFetch; uint32_t sqlLen; // the query sql, uint32_t phyLen; + int32_t msgMask; char msg[]; } SSubQueryMsg; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 651b37985178f0ce8e66b5fb55114aca43cc4028..74c1a8c07cd261fac97951bf5c0a8699adbb509c 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -57,6 +57,10 @@ typedef enum { #define QUERY_RSP_POLICY_DELAY 0 #define QUERY_RSP_POLICY_QUICK 1 +#define QUERY_MSG_MASK_SHOW_REWRITE() (1 << 0) +#define TEST_SHOW_REWRITE_MASK(m) (((m) & QUERY_MSG_MASK_SHOW_REWRITE()) != 0) + + typedef struct STableComInfo { uint8_t numOfTags; // the number of tags in schema uint8_t precision; // the number of precision diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 7a1e9bb2721c214dcf0ff961ca62dc0953d54882..6ddd906700dfb74517aa383a50cc6f45b39578c8 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -76,7 +76,7 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, void **qWorkerMgmt, const S int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); -int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg); +int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGrant); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_t ts); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index ccd781d1386ebc67364a12694ce9e10e57ac228a..d401f9c17bbed0f479d7cf4d242f61a1f5f00c84 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -145,7 +145,6 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp pHead->contLen = ntohl(pHead->contLen); pHead->vgId = ntohl(pHead->vgId); - pHead->msgMask = ntohl(pHead->msgMask); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId); if (pVnode == NULL) { @@ -156,15 +155,9 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp switch (qtype) { case QUERY_QUEUE: - if ((pMsg->msgType == TDMT_SCH_QUERY) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS) && !TEST_SHOW_REWRITE_MASK(pHead->msgMask)) { - terrno = TSDB_CODE_GRANT_EXPIRED; - code = terrno; - dDebug("vgId:%d, msg:%p put into vnode-query queue failed since %s", pVnode->vgId, pMsg, terrstr(code)); - } else { - vnodePreprocessQueryMsg(pVnode->pImpl, pMsg); - dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg); - taosWriteQitem(pVnode->pQueryQ, pMsg); - } + vnodePreprocessQueryMsg(pVnode->pImpl, pMsg); + dGTrace("vgId:%d, msg:%p put into vnode-query queue", pVnode->vgId, pMsg); + taosWriteQitem(pVnode->pQueryQ, pMsg); break; case STREAM_QUEUE: dGTrace("vgId:%d, msg:%p put into vnode-stream queue", pVnode->vgId, pMsg); diff --git a/source/dnode/mnode/impl/src/mndQuery.c b/source/dnode/mnode/impl/src/mndQuery.c index ca69d0c71c1a069739e6f1d5f90d99bdb5be3fd3..2b0edfebc2b1333f63a81203f8912e7d6c436034 100644 --- a/source/dnode/mnode/impl/src/mndQuery.c +++ b/source/dnode/mnode/impl/src/mndQuery.c @@ -21,7 +21,7 @@ int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg) { if (TDMT_SCH_QUERY != pMsg->msgType && TDMT_SCH_MERGE_QUERY != pMsg->msgType) return 0; SMnode *pMnode = pMsg->info.node; - return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg); + return qWorkerPreprocessQueryMsg(pMnode->pQuery, pMsg, false); } void mndPostProcessQueryMsg(SRpcMsg *pMsg) { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index b6f7e316389059c9b63bad1e12a24478b4bb1dbd..cee0b84672a6d8e5c57c6718f1f665b2ab82e934 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -519,7 +519,6 @@ static void *mndBuildVDropStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb, pHead->contLen = htonl(contLen); pHead->vgId = htonl(pVgroup->vgId); - pHead->msgMask = htonl(0); void *pBuf = POINTER_SHIFT(pHead, sizeof(SMsgHead)); diff --git a/source/dnode/qnode/src/qnode.c b/source/dnode/qnode/src/qnode.c index 289251f56092ac0f343775f35554d5314f27a41c..5efc714e95c85b528c24d64fc9642788d06c99ec 100644 --- a/source/dnode/qnode/src/qnode.c +++ b/source/dnode/qnode/src/qnode.c @@ -69,7 +69,7 @@ int32_t qndPreprocessQueryMsg(SQnode *pQnode, SRpcMsg *pMsg) { return 0; } - return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg); + return qWorkerPreprocessQueryMsg(pQnode->pQuery, pMsg, false); } int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 78d95cf0d78591cc9d1f28ea0222fdc106072f61..089905ee203ac8a894d9889f9eff93bab19195b3 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -329,7 +329,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return 0; } - return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg); + return qWorkerPreprocessQueryMsg(pVnode->pQuery, pMsg, TDMT_SCH_QUERY == pMsg->msgType); } int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 14ac4f5bea8a3b6b82b817add71d2f277eed48d8..7e7f71b1764f79f6c6b52a77d8e981d285f12add 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -8,6 +8,7 @@ #include "tcommon.h" #include "tmsg.h" #include "tname.h" +#include "tgrant.h" int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp) { int32_t msgSize = sizeof(SRetrieveTableRsp) + length; @@ -305,7 +306,7 @@ int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo * return TSDB_CODE_SUCCESS; } -int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { +int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg, bool chkGrant) { if (NULL == qWorkerMgmt || NULL == pMsg) { QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -326,6 +327,12 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { msg->execId = ntohl(msg->execId); msg->phyLen = ntohl(msg->phyLen); msg->sqlLen = ntohl(msg->sqlLen); + msg->msgMask = ntohl(msg->msgMask); + + if (chkGrant && (!TEST_SHOW_REWRITE_MASK(msg->msgMask)) && (grantCheck(TSDB_GRANT_TIME) != TSDB_CODE_SUCCESS)) { + QW_ELOG("query failed cause of grant expired, msgMask:%d", msg->msgMask); + QW_ERR_RET(TSDB_CODE_GRANT_EXPIRED); + } uint64_t sId = msg->sId; uint64_t qId = msg->queryId; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 17f1ea6728cfc933dd5f0585ce0e1f0e1f1283ad..1b5a6efc002a7f3961108a0a0036f3e0b7f3b5e5 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1047,7 +1047,6 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SSubQueryMsg *pMsg = msg; pMsg->header.vgId = htonl(addr->nodeId); - pMsg->header.msgMask = htonl((pTask->plan->showRewrite) ? SHOW_REWRITE_MASK() : 0); pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); @@ -1058,6 +1057,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->needFetch = SCH_TASK_NEED_FETCH(pTask); pMsg->phyLen = htonl(pTask->msgLen); pMsg->sqlLen = htonl(len); + pMsg->msgMask = htonl((pTask->plan->showRewrite) ? QUERY_MSG_MASK_SHOW_REWRITE() : 0); memcpy(pMsg->msg, pJob->sql, len); memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index aba61edf0d35900dff0e1930318b83d18caec7a6..6d372acf2fd1b349e60309ecc0dcc87e995a8d01 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -72,7 +72,6 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) SMsgHead head; head.vgId = vgId; head.contLen = sizeof(SMsgHead); - head.msgMask = 0; SRpcMsg rpcMsg; memset(&rpcMsg, 0, sizeof(SRpcMsg)); rpcMsg.contLen = head.contLen;