From a7f44f38e0be1da90cd8a541b9b34f0f11672101 Mon Sep 17 00:00:00 2001 From: dapan1121 <89396746@qq.com> Date: Sat, 10 Apr 2021 11:37:10 +0800 Subject: [PATCH] use global uid as query id --- src/common/inc/tglobal.h | 1 + src/common/src/tglobal.c | 1 + src/dnode/src/dnodeCfg.c | 2 ++ src/inc/query.h | 1 + src/query/src/qExecutor.c | 28 ++++++++++++++++++++++++++-- src/query/src/queryMain.c | 1 + src/vnode/src/vnodeRead.c | 3 ++- 7 files changed, 34 insertions(+), 3 deletions(-) diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 3f96466cc0..26475834d5 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting; extern char tsEmail[]; extern char tsArbitrator[]; extern int8_t tsArbOnline; +extern int32_t tsDnodeId; // common extern int tsRpcTimer; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index ea5bf7954d..69b01e6c08 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -43,6 +43,7 @@ int8_t tsEnableVnodeBak = 1; int8_t tsEnableTelemetryReporting = 1; int8_t tsArbOnline = 0; char tsEmail[TSDB_FQDN_LEN] = {0}; +int32_t tsDnodeId = 0; // common int32_t tsRpcTimer = 1000; diff --git a/src/dnode/src/dnodeCfg.c b/src/dnode/src/dnodeCfg.c index fd5956b37f..c573d709f5 100644 --- a/src/dnode/src/dnodeCfg.c +++ b/src/dnode/src/dnodeCfg.c @@ -17,6 +17,7 @@ #include "os.h" #include "cJSON.h" #include "dnodeCfg.h" +#include "tglobal.h" static SDnodeCfg tsCfg = {0}; static pthread_mutex_t tsCfgMutex; @@ -70,6 +71,7 @@ static void dnodeResetCfg(SDnodeCfg *cfg) { pthread_mutex_lock(&tsCfgMutex); tsCfg.dnodeId = cfg->dnodeId; + tsDnodeId = cfg->dnodeId; tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN); dnodePrintCfg(cfg); dnodeWriteCfg(); diff --git a/src/inc/query.h b/src/inc/query.h index c9dabcef54..d4ff811a8d 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -92,6 +92,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo); void** qAcquireQInfo(void* pMgmt, uint64_t key); void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle); bool checkQIdEqual(void *qHandle, uint64_t qId); +int64_t genQueryId(void); #ifdef __cplusplus } diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index dda67d77b2..c7e4223db5 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -105,6 +105,30 @@ int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } + +int64_t genQueryId(void) { + int64_t uid = 0; + int64_t did = tsDnodeId; + + uid = did << 54; + + int64_t pid = ((int64_t)taosGetPId()) & 0x3FF; + + uid |= pid << 44; + + int64_t ts = taosGetTimestampMs() & 0x1FFFFFFFF; + + uid |= ts << 11; + + int64_t sid = atomic_add_fetch_64(&queryHandleId, 1) & 0x7FF; + + uid |= sid; + + return uid; +} + + + static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') { @@ -6184,6 +6208,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr goto _cleanup_qinfo; } + pQInfo->qId = *qId; + // to make sure third party won't overwrite this structure pQInfo->signature = pQInfo; SQuery* pQuery = &pQInfo->query; @@ -6316,8 +6342,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr // todo refactor pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); - pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1); - *qId = pQInfo->qId; qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo); return pQInfo; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index 2ff0c9676e..838f6aa0c9 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -197,6 +197,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi return code; } + bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { SQInfo *pQInfo = (SQInfo *)qinfo; assert(pQInfo && pQInfo->signature == pQInfo); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 0836ade77f..5a7fc52931 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -208,6 +208,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) { pRsp->completed = true; } + static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { void * pCont = pRead->pCont; int32_t contLen = pRead->contLen; @@ -226,7 +227,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { if (contLen != 0) { qinfo_t pQInfo = NULL; - uint64_t qId = 0; + uint64_t qId = genQueryId(); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId); SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); -- GitLab