未验证 提交 59fd6fe8 编写于 作者: S slguan 提交者: GitHub

Merge pull request #907 from taosdata/feature/liaohj

Feature/liaohj
......@@ -528,8 +528,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
numOfFetch++;
}
} else {
if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) &&
tscProjectionQueryOnTable(&pSql->cmd)) || (pRes->numOfRows == 0)) {
if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i]))) {
numOfFetch++;
}
}
......
......@@ -1290,6 +1290,12 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query
pNew->cmd.type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
assert(pNew->cmd.numOfTables == 1);
//launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0);
pMeterMetaInfo->vnodeIndex = trsupport->subqueryIndex;
pSql->pSubs[trsupport->subqueryIndex] = pNew;
}
......@@ -1507,7 +1513,6 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pQueryMsg->uid = pMeterMeta->uid;
pQueryMsg->numOfTagsCols = 0;
} else { // query on metric
SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
if (pMeterMetaInfo->vnodeIndex < 0) {
tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex);
return -1;
......@@ -2872,15 +2877,14 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) {
pElem->groupbyTagColumnList = htonl(offset);
for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) {
SColIndexEx *pCol = &pCmd->groupbyExpr.columnInfo[j];
*((int16_t *)pMsg) = pCol->colId;
pMsg += sizeof(pCol->colId);
*((int16_t *)pMsg) += pCol->colIdx;
pMsg += sizeof(pCol->colIdx);
*((int16_t *)pMsg) += pCol->flag;
pMsg += sizeof(pCol->flag);
SColIndexEx* pDestCol = (SColIndexEx*) pMsg;
pDestCol->colIdxInBuf = 0;
pDestCol->colIdx = htons(pCol->colIdx);
pDestCol->colId = htons(pDestCol->colId);
pDestCol->flag = htons(pDestCol->flag);
pMsg += sizeof(SColIndexEx);
}
}
}
......
......@@ -1706,12 +1706,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
pNew->fp = fp;
pNew->param = param;
SMeterMetaInfo* pMetermetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex);
char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid);
tscGetMetricMetaCacheKey(pCmd, key, uid);
#ifdef _DEBUG_VIEW
printf("the metricmeta key is:%s\n", key);
......@@ -1736,7 +1734,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
}
assert(pFinalInfo->pMeterMeta != NULL);
if (UTIL_METER_IS_METRIC(pMetermetaInfo)) {
if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
assert(pFinalInfo->pMetricMeta != NULL);
}
......
......@@ -206,26 +206,6 @@ typedef struct {
char cont[];
} SVMsgHeader;
/*
* The value of QInfo.signature is used to denote that a query is executing, it isn't safe to release QInfo yet.
* The release operations will be blocked in a busy-waiting until the query operation reach a safepoint.
* Then it will reset the signature in a atomic operation, followed by release operation.
* Only the QInfo.signature == QInfo, this structure can be released safely.
*/
#define TSDB_QINFO_QUERY_FLAG 0x1
#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x))
#define TSDB_QINFO_SET_QUERY_FLAG(x) \
atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG);
// live lock: wait for query reaching a safe-point, release all resources
// belongs to this query
#define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \
{ \
while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \
taosMsleep(1); \
} \
}
struct tSQLBinaryExpr;
typedef struct SColumnInfoEx {
......@@ -337,7 +317,7 @@ extern void * vnodeTmrCtrl;
// read API
extern int (*vnodeSearchKeyFunc[])(char *pValue, int num, TSKEY key, int order);
void *vnodeQueryInTimeRange(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *sqlExprs,
void *vnodeQueryOnSingleTable(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *sqlExprs,
SQueryMeterMsg *pQueryMsg, int *code);
void *vnodeQueryOnMultiMeters(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
......@@ -370,6 +350,8 @@ void vnodeFreeQInfo(void *, bool);
void vnodeFreeQInfoInQueue(void *param);
bool vnodeIsQInfoValid(void *param);
void vnodeDecRefCount(void *param);
void vnodeAddRefCount(void *param);
int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQuery);
......
......@@ -52,7 +52,7 @@ typedef struct SQueryLoadCompBlockInfo {
* the header file info for one vnode
*/
typedef struct SHeaderFileInfo {
int32_t fileID; // file id
int32_t fileID; // file id
} SHeaderFileInfo;
typedef struct SQueryCostSummary {
......@@ -99,17 +99,17 @@ typedef struct SQueryFilesInfo {
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap.
int32_t vnodeId;
int32_t headerFd; // header file fd
char* pHeaderFileData; // mmap header files
int64_t headFileSize;
int32_t dataFd;
int32_t lastFd;
char headerFilePath[PATH_MAX]; // current opened header file name
char dataFilePath[PATH_MAX]; // current opened data file name
char lastFilePath[PATH_MAX]; // current opened last file path
char dbFilePathPrefix[PATH_MAX];
int32_t headerFd; // header file fd
char* pHeaderFileData; // mmap header files
int64_t headFileSize;
int32_t dataFd;
int32_t lastFd;
char headerFilePath[PATH_MAX]; // current opened header file name
char dataFilePath[PATH_MAX]; // current opened data file name
char lastFilePath[PATH_MAX]; // current opened last file path
char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo;
typedef struct RuntimeEnvironment {
......@@ -129,17 +129,17 @@ typedef struct RuntimeEnvironment {
SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */
SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
SQueryFilesInfo vnodeFileInfo;
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SOutputRes* pResult; // reference to SQuerySupporter->pResult
void* hashList;
int32_t usedIndex; // assigned SOutputRes in list
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS];
int16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo;
SData** pInterpoBuf;
SOutputRes* pResult; // reference to SQuerySupporter->pResult
void* hashList;
int32_t usedIndex; // assigned SOutputRes in list
STSBuf* pTSBuf;
STSCursor cur;
SQueryCostSummary summary;
} SQueryRuntimeEnv;
/* intermediate result during multimeter query involves interval */
......@@ -214,14 +214,12 @@ typedef struct SMeterQuerySupportObj {
SMeterDataInfo* pMeterDataInfo;
TSKEY* tsList;
int32_t tsNum;
TSKEY* tsList;
} SMeterQuerySupportObj;
typedef struct _qinfo {
uint64_t signature;
uint64_t signature;
int32_t refCount; // QInfo reference count, when the value is 0, it can be released safely
char user[TSDB_METER_ID_LEN + 1];
char sql[TSDB_SHOW_SQL_LEN];
uint8_t stream;
......@@ -231,24 +229,21 @@ typedef struct _qinfo {
int64_t useconds;
int killed;
struct _qinfo *prev, *next;
SQuery query;
int num;
int totalPoints;
int pointsRead;
int pointsReturned;
int pointsInterpo;
int code;
char bufIndex;
char changed;
char over;
SMeterObj* pObj;
sem_t dataReady;
SQuery query;
int num;
int totalPoints;
int pointsRead;
int pointsReturned;
int pointsInterpo;
int code;
char bufIndex;
char changed;
char over;
SMeterObj* pObj;
int (*fp)(SMeterObj*, SQuery*);
sem_t dataReady;
SMeterQuerySupportObj* pMeterQuerySupporter;
int (*fp)(SMeterObj*, SQuery*);
} SQInfo;
int32_t vnodeQuerySingleMeterPrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj,
......
......@@ -1094,9 +1094,12 @@ static SMetricMetaElemMsg *doConvertMetricMetaMsg(SMetricMetaMsg *pMetricMetaMsg
pElem->groupbyTagColumnList = htonl(pElem->groupbyTagColumnList);
int16_t *groupColIds = (int16_t*) (((char *)pMetricMetaMsg) + pElem->groupbyTagColumnList);
SColIndexEx *groupColIds = (SColIndexEx*) (((char *)pMetricMetaMsg) + pElem->groupbyTagColumnList);
for (int32_t i = 0; i < pElem->numOfGroupCols; ++i) {
groupColIds[i] = htons(groupColIds[i]);
groupColIds[i].colId = htons(groupColIds[i].colId);
groupColIds[i].colIdx = htons(groupColIds[i].colIdx);
groupColIds[i].flag = htons(groupColIds[i].flag);
groupColIds[i].colIdxInBuf = 0;
}
return pElem;
......
......@@ -784,14 +784,15 @@ int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQuer
}
// todo refactor!!!!!
static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, void* param) {
static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, int32_t len, char* param) {
if (offset == TSDB_TBNAME_COLUMN_INDEX) {
extractMeterName(pMeter->meterId, param);
return param;
} else {
char* tags = pMeter->pTagData + TSDB_METER_ID_LEN; // tag start position
return (tags + offset);
char* tags = pMeter->pTagData + offset + TSDB_METER_ID_LEN; // tag start position
memcpy(param, tags, len); // make sure the value is null-terminated string
}
return param;
}
bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
......@@ -799,8 +800,9 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*)param;
STabObj* pMeter = (STabObj*)(((tSkipListNode*)pNode)->pData);
char name[TSDB_METER_NAME_LEN + 1] = {0};
char* val = getTagValueFromMeter(pMeter, pInfo->offset, name);
char buf[TSDB_MAX_TAGS_LEN] = {0};
char* val = getTagValueFromMeter(pMeter, pInfo->offset, pInfo->sch.bytes, buf);
int8_t type = pInfo->sch.type;
int32_t ret = 0;
......
......@@ -896,6 +896,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
pSupporter->pMeterDataInfo = (SMeterDataInfo *)calloc(1, sizeof(SMeterDataInfo) * pSupporter->numOfMeters);
if (pSupporter->pMeterDataInfo == NULL) {
dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
return;
}
......@@ -1172,12 +1173,13 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
}
if (pQInfo->killed) {
TSDB_QINFO_RESET_SIG(pQInfo);
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
dTrace("QInfo:%p it is already killed, abort", pQInfo);
vnodeDecRefCount(pQInfo);
return;
}
assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
assert(pQInfo->refCount >= 1);
SQuery * pQuery = &pQInfo->query;
SMeterObj *pMeterObj = pQInfo->pObj;
......@@ -1211,10 +1213,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
dTrace("QInfo:%p reset signature", pQInfo);
sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
vnodeDecRefCount(pQInfo);
return;
}
......@@ -1233,23 +1233,22 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead,
pQInfo->pointsInterpo, pQInfo->pointsReturned);
dTrace("QInfo:%p reset signature", pQInfo);
sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
vnodeDecRefCount(pQInfo);
return;
}
}
}
pQInfo->over = 1;
dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned, reset signature", pQInfo,
dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead);
vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter);
sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
vnodeDecRefCount(pQInfo);
return;
}
......@@ -1277,15 +1276,15 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
/* check if query is killed or not */
if (isQueryKilled(pQuery)) {
dTrace("QInfo:%p query is killed, reset signature", pQInfo);
dTrace("QInfo:%p query is killed", pQInfo);
pQInfo->over = 1;
} else {
dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned, reset signature",
dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned",
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead);
}
sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
vnodeDecRefCount(pQInfo);
}
void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
......@@ -1296,12 +1295,12 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
}
if (pQInfo->killed) {
TSDB_QINFO_RESET_SIG(pQInfo);
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
vnodeDecRefCount(pQInfo);
dTrace("QInfo:%p it is already killed, abort", pQInfo);
return;
}
assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
assert(pQInfo->refCount >= 1);
SQuery *pQuery = &pQInfo->query;
pQuery->pointsRead = 0;
......@@ -1322,7 +1321,6 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
pQInfo->useconds += (taosGetTimestampUs() - st);
pQInfo->over = isQueryKilled(pQuery) ? 1 : 0;
dTrace("QInfo:%p reset signature", pQInfo);
taosInterpoSetStartInfo(&pQInfo->pMeterQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead,
pQInfo->query.interpoType);
......@@ -1330,11 +1328,11 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
if (pQuery->pointsRead == 0) {
pQInfo->over = 1;
dTrace("QInfo:%p over, %d meters queried, %d points are returned, reset signature", pQInfo, pSupporter->numOfMeters,
dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters,
pQInfo->pointsRead);
vnodePrintQueryStatistics(pSupporter);
}
sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
vnodeDecRefCount(pQInfo);
}
......@@ -392,10 +392,10 @@ __clean_memory:
return NULL;
}
static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) {
SQInfo *pQInfo = (SQInfo *)pMsg->ahandle;
vnodeFreeQInfo(pQInfo, true);
}
//static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) {
// SQInfo *pQInfo = (SQInfo *)pMsg->ahandle;
// vnodeFreeQInfo(pQInfo, true);
//}
void vnodeFreeQInfoInQueue(void *param) {
SQInfo *pQInfo = (SQInfo *)param;
......@@ -403,15 +403,18 @@ void vnodeFreeQInfoInQueue(void *param) {
if (!vnodeIsQInfoValid(pQInfo)) return;
pQInfo->killed = 1;
dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo);
SSchedMsg schedMsg = {0};
schedMsg.fp = vnodeFreeQInfoInQueueImpl;
schedMsg.msg = NULL;
schedMsg.thandle = (void *)1;
schedMsg.ahandle = param;
taosScheduleTask(queryQhandle, &schedMsg);
dTrace("QInfo:%p set kill flag to free QInfo");
vnodeDecRefCount(pQInfo);
// dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo);
// SSchedMsg schedMsg = {0};
// schedMsg.fp = vnodeFreeQInfoInQueueImpl;
// schedMsg.msg = NULL;
// schedMsg.thandle = (void *)1;
// schedMsg.ahandle = param;
// taosScheduleTask(queryQhandle, &schedMsg);
}
void vnodeFreeQInfo(void *param, bool decQueryRef) {
......@@ -419,8 +422,6 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) {
if (!vnodeIsQInfoValid(param)) return;
pQInfo->killed = 1;
TSDB_WAIT_TO_SAFE_DROP_QINFO(pQInfo);
SMeterObj *pObj = pQInfo->pObj;
dTrace("QInfo:%p start to free SQInfo", pQInfo);
......@@ -499,7 +500,30 @@ bool vnodeIsQInfoValid(void *param) {
* into local variable, then compare by using local variable
*/
uint64_t sig = pQInfo->signature;
return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG);
return (sig == (uint64_t)pQInfo);
}
void vnodeDecRefCount(void *param) {
SQInfo *pQInfo = (SQInfo*) param;
assert(vnodeIsQInfoValid(pQInfo));
int32_t ref = atomic_sub_fetch_32(&pQInfo->refCount, 1);
assert(ref >= 0);
dTrace("QInfo:%p decrease obj refcount, %d", pQInfo, ref);
if (ref == 0) {
vnodeFreeQInfo(pQInfo, true);
}
}
void vnodeAddRefCount(void *param) {
SQInfo *pQInfo = (SQInfo*) param;
assert(vnodeIsQInfoValid(pQInfo));
int32_t ref = atomic_add_fetch_32(&pQInfo->refCount, 1);
dTrace("QInfo:%p add refcount, %d", pQInfo, ref);
}
void vnodeQueryData(SSchedMsg *pMsg) {
......@@ -509,12 +533,11 @@ void vnodeQueryData(SSchedMsg *pMsg) {
pQInfo = (SQInfo *)pMsg->ahandle;
if (pQInfo->killed) {
TSDB_QINFO_RESET_SIG(pQInfo);
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo);
dTrace("QInfo:%p it is already killed, abort", pQInfo);
vnodeDecRefCount(pQInfo);
return;
}
assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
pQuery = &(pQInfo->query);
SMeterObj *pObj = pQInfo->pObj;
......@@ -582,13 +605,11 @@ void vnodeQueryData(SSchedMsg *pMsg) {
tclose(pQInfo->query.lfd);
}
/* reset QInfo signature */
dTrace("QInfo:%p reset signature", pQInfo);
TSDB_QINFO_RESET_SIG(pQInfo);
sem_post(&pQInfo->dataReady);
vnodeDecRefCount(pQInfo);
}
void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SQueryMeterMsg *pQueryMsg, int32_t *code) {
SQInfo *pQInfo;
SQuery *pQuery;
......@@ -659,6 +680,7 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp
}
if (pQInfo->over == 1) {
vnodeAddRefCount(pQInfo); // for retrieve procedure
return pQInfo;
}
......@@ -667,15 +689,20 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp
schedMsg.fp = vnodeQueryData;
}
// set in query flag
pQInfo->signature = TSDB_QINFO_QUERY_FLAG;
/*
* The reference count, which is 2, is for both the current query thread and the future retrieve request,
* which will always be issued by client to acquire data or free SQInfo struct.
*/
vnodeAddRefCount(pQInfo);
vnodeAddRefCount(pQInfo);
schedMsg.msg = NULL;
schedMsg.thandle = (void *)1;
schedMsg.ahandle = pQInfo;
dTrace("QInfo:%p set query flag and prepare runtime environment completed, wait for schedule", pQInfo);
dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
pQInfo->refCount);
taosScheduleTask(queryQhandle, &schedMsg);
return pQInfo;
......@@ -775,12 +802,13 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
goto _error;
}
vnodeAddRefCount(pQInfo);
if (pQInfo->over == 1) {
return pQInfo;
}
pQInfo->signature = TSDB_QINFO_QUERY_FLAG;
vnodeAddRefCount(pQInfo);
schedMsg.msg = NULL;
schedMsg.thandle = (void *)1;
schedMsg.ahandle = pQInfo;
......@@ -860,21 +888,15 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
pQInfo->pointsRead);
if (pQInfo->over == 0) {
//dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
dTrace("QInfo:%p set query flag, oldSig:%p", pQInfo, pQInfo->signature);
uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo);
/*
* If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo,
* since in release function, the original value has been destroyed. However, this memory area may be reused
* by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo.
*/
if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) {
dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature);
dTrace("QInfo:%p set query flag, sig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
if (pQInfo->killed == 1) {
dTrace("%p freed or killed, abort query", pQInfo);
} else {
vnodeAddRefCount(pQInfo);
dTrace("%p add query into task queue for schedule", pQInfo);
SSchedMsg schedMsg;
SSchedMsg schedMsg = {0};
if (pQInfo->pMeterQuerySupporter != NULL) {
if (pQInfo->pMeterQuerySupporter->pSidSet == NULL) {
......
......@@ -376,7 +376,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (QUERY_IS_STABLE_QUERY(pQueryMsg->queryType)) {
pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code);
} else {
pObj->qhandle = vnodeQueryInTimeRange(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code);
pObj->qhandle = vnodeQueryOnSingleTable(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code);
}
_query_over:
......@@ -470,9 +470,11 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
pMsg += size;
msgLen = pMsg - pStart;
assert(code != TSDB_CODE_ACTION_IN_PROGRESS);
if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) {
dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
vnodeFreeQInfo(pObj->qhandle, true);
vnodeDecRefCount(pObj->qhandle);
pObj->qhandle = NULL;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册