提交 a7f44f38 编写于 作者: D dapan1121

use global uid as query id

上级 deb3ad7e
...@@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting; ...@@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting;
extern char tsEmail[]; extern char tsEmail[];
extern char tsArbitrator[]; extern char tsArbitrator[];
extern int8_t tsArbOnline; extern int8_t tsArbOnline;
extern int32_t tsDnodeId;
// common // common
extern int tsRpcTimer; extern int tsRpcTimer;
......
...@@ -43,6 +43,7 @@ int8_t tsEnableVnodeBak = 1; ...@@ -43,6 +43,7 @@ int8_t tsEnableVnodeBak = 1;
int8_t tsEnableTelemetryReporting = 1; int8_t tsEnableTelemetryReporting = 1;
int8_t tsArbOnline = 0; int8_t tsArbOnline = 0;
char tsEmail[TSDB_FQDN_LEN] = {0}; char tsEmail[TSDB_FQDN_LEN] = {0};
int32_t tsDnodeId = 0;
// common // common
int32_t tsRpcTimer = 1000; int32_t tsRpcTimer = 1000;
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "dnodeCfg.h" #include "dnodeCfg.h"
#include "tglobal.h"
static SDnodeCfg tsCfg = {0}; static SDnodeCfg tsCfg = {0};
static pthread_mutex_t tsCfgMutex; static pthread_mutex_t tsCfgMutex;
...@@ -70,6 +71,7 @@ static void dnodeResetCfg(SDnodeCfg *cfg) { ...@@ -70,6 +71,7 @@ static void dnodeResetCfg(SDnodeCfg *cfg) {
pthread_mutex_lock(&tsCfgMutex); pthread_mutex_lock(&tsCfgMutex);
tsCfg.dnodeId = cfg->dnodeId; tsCfg.dnodeId = cfg->dnodeId;
tsDnodeId = cfg->dnodeId;
tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN); tstrncpy(tsCfg.clusterId, cfg->clusterId, TSDB_CLUSTER_ID_LEN);
dnodePrintCfg(cfg); dnodePrintCfg(cfg);
dnodeWriteCfg(); dnodeWriteCfg();
......
...@@ -92,6 +92,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo); ...@@ -92,6 +92,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qId, uint64_t qInfo);
void** qAcquireQInfo(void* pMgmt, uint64_t key); void** qAcquireQInfo(void* pMgmt, uint64_t key);
void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle); void** qReleaseQInfo(void* pMgmt, void* pQInfo, bool freeHandle);
bool checkQIdEqual(void *qHandle, uint64_t qId); bool checkQIdEqual(void *qHandle, uint64_t qId);
int64_t genQueryId(void);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -105,6 +105,30 @@ int32_t getMaximumIdleDurationSec() { ...@@ -105,6 +105,30 @@ int32_t getMaximumIdleDurationSec() {
return tsShellActivityTimer * 2; return tsShellActivityTimer * 2;
} }
int64_t genQueryId(void) {
int64_t uid = 0;
int64_t did = tsDnodeId;
uid = did << 54;
int64_t pid = ((int64_t)taosGetPId()) & 0x3FF;
uid |= pid << 44;
int64_t ts = taosGetTimestampMs() & 0x1FFFFFFFF;
uid |= ts << 11;
int64_t sid = atomic_add_fetch_64(&queryHandleId, 1) & 0x7FF;
uid |= sid;
return uid;
}
static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) { static void getNextTimeWindow(SQuery* pQuery, STimeWindow* tw) {
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') { if (pQuery->interval.intervalUnit != 'n' && pQuery->interval.intervalUnit != 'y') {
...@@ -6184,6 +6208,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -6184,6 +6208,8 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
goto _cleanup_qinfo; goto _cleanup_qinfo;
} }
pQInfo->qId = *qId;
// to make sure third party won't overwrite this structure // to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo; pQInfo->signature = pQInfo;
SQuery* pQuery = &pQInfo->query; SQuery* pQuery = &pQInfo->query;
...@@ -6316,8 +6342,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr ...@@ -6316,8 +6342,6 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SSqlGroupbyExpr* pGroupbyExpr
// todo refactor // todo refactor
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX); pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
pQInfo->qId = atomic_add_fetch_64(&queryHandleId, 1);
*qId = pQInfo->qId;
qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo); qDebug("qmsg:%p QInfo:%" PRIu64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
return pQInfo; return pQInfo;
......
...@@ -197,6 +197,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -197,6 +197,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
return code; return code;
} }
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) { bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo; SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo); assert(pQInfo && pQInfo->signature == pQInfo);
......
...@@ -208,6 +208,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) { ...@@ -208,6 +208,7 @@ static void vnodeBuildNoResultQueryRsp(SRspRet *pRet) {
pRsp->completed = true; pRsp->completed = true;
} }
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
void * pCont = pRead->pCont; void * pCont = pRead->pCont;
int32_t contLen = pRead->contLen; int32_t contLen = pRead->contLen;
...@@ -226,7 +227,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -226,7 +227,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (contLen != 0) { if (contLen != 0) {
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
uint64_t qId = 0; uint64_t qId = genQueryId();
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId); code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, &qId);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册