diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index 375e0066b1b2ad4ab2692552110bbdade3808fdc..a94b308e87a784e55a01f2d19c0c7cba60b3beeb 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -528,8 +528,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { numOfFetch++; } } else { - if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) && - tscProjectionQueryOnTable(&pSql->cmd)) || (pRes->numOfRows == 0)) { + if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i]))) { numOfFetch++; } } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 17d870fcb22d64589fa610ab27490a595312b35a..40399d85b742b080bf35012534fa733024a6b956 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -1290,6 +1290,12 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj); if (pNew != NULL) { // the sub query of two-stage super table query pNew->cmd.type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; + assert(pNew->cmd.numOfTables == 1); + + //launch subquery for each vnode, so the subquery index equals to the vnodeIndex. + SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0); + pMeterMetaInfo->vnodeIndex = trsupport->subqueryIndex; + pSql->pSubs[trsupport->subqueryIndex] = pNew; } @@ -1507,7 +1513,6 @@ int tscBuildQueryMsg(SSqlObj *pSql) { pQueryMsg->uid = pMeterMeta->uid; pQueryMsg->numOfTagsCols = 0; } else { // query on metric - SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta; if (pMeterMetaInfo->vnodeIndex < 0) { tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex); return -1; @@ -2872,15 +2877,14 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) { pElem->groupbyTagColumnList = htonl(offset); for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) { SColIndexEx *pCol = &pCmd->groupbyExpr.columnInfo[j]; - - *((int16_t *)pMsg) = pCol->colId; - pMsg += sizeof(pCol->colId); - - *((int16_t *)pMsg) += pCol->colIdx; - pMsg += sizeof(pCol->colIdx); - - *((int16_t *)pMsg) += pCol->flag; - pMsg += sizeof(pCol->flag); + SColIndexEx* pDestCol = (SColIndexEx*) pMsg; + + pDestCol->colIdxInBuf = 0; + pDestCol->colIdx = htons(pCol->colIdx); + pDestCol->colId = htons(pDestCol->colId); + pDestCol->flag = htons(pDestCol->flag); + + pMsg += sizeof(SColIndexEx); } } } diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 9876dad90667daa013d6dc1d7de7eb93df74b72b..4b0df767848b3117f0d4e382c5cb88b6843fc8ef 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1706,12 +1706,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } pNew->fp = fp; - pNew->param = param; - SMeterMetaInfo* pMetermetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex); - + char key[TSDB_MAX_TAGS_LEN + 1] = {0}; - tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid); + tscGetMetricMetaCacheKey(pCmd, key, uid); #ifdef _DEBUG_VIEW printf("the metricmeta key is:%s\n", key); @@ -1736,7 +1734,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void } assert(pFinalInfo->pMeterMeta != NULL); - if (UTIL_METER_IS_METRIC(pMetermetaInfo)) { + if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) { assert(pFinalInfo->pMetricMeta != NULL); } diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 6e366b9b7fbfb0f5b6ad4d9c64d520c358a85e39..3a5e7e7260f63c3c6e4e8a9f2c3629a027fa0224 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -206,26 +206,6 @@ typedef struct { char cont[]; } SVMsgHeader; -/* - * The value of QInfo.signature is used to denote that a query is executing, it isn't safe to release QInfo yet. - * The release operations will be blocked in a busy-waiting until the query operation reach a safepoint. - * Then it will reset the signature in a atomic operation, followed by release operation. - * Only the QInfo.signature == QInfo, this structure can be released safely. - */ -#define TSDB_QINFO_QUERY_FLAG 0x1 -#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x)) -#define TSDB_QINFO_SET_QUERY_FLAG(x) \ - atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG); - -// live lock: wait for query reaching a safe-point, release all resources -// belongs to this query -#define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \ - { \ - while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \ - taosMsleep(1); \ - } \ - } - struct tSQLBinaryExpr; typedef struct SColumnInfoEx { @@ -337,7 +317,7 @@ extern void * vnodeTmrCtrl; // read API extern int (*vnodeSearchKeyFunc[])(char *pValue, int num, TSKEY key, int order); -void *vnodeQueryInTimeRange(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *sqlExprs, +void *vnodeQueryOnSingleTable(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *sqlExprs, SQueryMeterMsg *pQueryMsg, int *code); void *vnodeQueryOnMultiMeters(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, @@ -370,6 +350,8 @@ void vnodeFreeQInfo(void *, bool); void vnodeFreeQInfoInQueue(void *param); bool vnodeIsQInfoValid(void *param); +void vnodeDecRefCount(void *param); +void vnodeAddRefCount(void *param); int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQuery); diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index d059075142126545a84c9cd60c0b14e1b79c5d7a..3a1b6f7949e398eb547c3c6bfec79bc82869e1fb 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -52,7 +52,7 @@ typedef struct SQueryLoadCompBlockInfo { * the header file info for one vnode */ typedef struct SHeaderFileInfo { - int32_t fileID; // file id + int32_t fileID; // file id } SHeaderFileInfo; typedef struct SQueryCostSummary { @@ -99,17 +99,17 @@ typedef struct SQueryFilesInfo { uint32_t numOfFiles; // the total available number of files for this virtual node during query execution int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap. int32_t vnodeId; - - int32_t headerFd; // header file fd - char* pHeaderFileData; // mmap header files - int64_t headFileSize; - int32_t dataFd; - int32_t lastFd; - - char headerFilePath[PATH_MAX]; // current opened header file name - char dataFilePath[PATH_MAX]; // current opened data file name - char lastFilePath[PATH_MAX]; // current opened last file path - char dbFilePathPrefix[PATH_MAX]; + + int32_t headerFd; // header file fd + char* pHeaderFileData; // mmap header files + int64_t headFileSize; + int32_t dataFd; + int32_t lastFd; + + char headerFilePath[PATH_MAX]; // current opened header file name + char dataFilePath[PATH_MAX]; // current opened data file name + char lastFilePath[PATH_MAX]; // current opened last file path + char dbFilePathPrefix[PATH_MAX]; } SQueryFilesInfo; typedef struct RuntimeEnvironment { @@ -129,17 +129,17 @@ typedef struct RuntimeEnvironment { SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */ SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */ SQueryFilesInfo vnodeFileInfo; - int16_t numOfRowsPerPage; - int16_t offset[TSDB_MAX_COLUMNS]; - int16_t scanFlag; // denotes reversed scan of data or not - SInterpolationInfo interpoInfo; - SData** pInterpoBuf; - SOutputRes* pResult; // reference to SQuerySupporter->pResult - void* hashList; - int32_t usedIndex; // assigned SOutputRes in list - STSBuf* pTSBuf; - STSCursor cur; - SQueryCostSummary summary; + int16_t numOfRowsPerPage; + int16_t offset[TSDB_MAX_COLUMNS]; + int16_t scanFlag; // denotes reversed scan of data or not + SInterpolationInfo interpoInfo; + SData** pInterpoBuf; + SOutputRes* pResult; // reference to SQuerySupporter->pResult + void* hashList; + int32_t usedIndex; // assigned SOutputRes in list + STSBuf* pTSBuf; + STSCursor cur; + SQueryCostSummary summary; } SQueryRuntimeEnv; /* intermediate result during multimeter query involves interval */ @@ -214,14 +214,12 @@ typedef struct SMeterQuerySupportObj { SMeterDataInfo* pMeterDataInfo; - TSKEY* tsList; - int32_t tsNum; - + TSKEY* tsList; } SMeterQuerySupportObj; typedef struct _qinfo { - uint64_t signature; - + uint64_t signature; + int32_t refCount; // QInfo reference count, when the value is 0, it can be released safely char user[TSDB_METER_ID_LEN + 1]; char sql[TSDB_SHOW_SQL_LEN]; uint8_t stream; @@ -231,24 +229,21 @@ typedef struct _qinfo { int64_t useconds; int killed; struct _qinfo *prev, *next; + SQuery query; + int num; + int totalPoints; + int pointsRead; + int pointsReturned; + int pointsInterpo; + int code; + char bufIndex; + char changed; + char over; + SMeterObj* pObj; + sem_t dataReady; - SQuery query; - int num; - int totalPoints; - int pointsRead; - int pointsReturned; - int pointsInterpo; - int code; - char bufIndex; - char changed; - char over; - SMeterObj* pObj; - - int (*fp)(SMeterObj*, SQuery*); - - sem_t dataReady; SMeterQuerySupportObj* pMeterQuerySupporter; - + int (*fp)(SMeterObj*, SQuery*); } SQInfo; int32_t vnodeQuerySingleMeterPrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj, diff --git a/src/system/detail/src/mgmtMeter.c b/src/system/detail/src/mgmtMeter.c index 5ef9f61b0bbc34abb6db54af027d14bc21379224..be141f278ddb6cac0c2bc98abee4c350d1974667 100644 --- a/src/system/detail/src/mgmtMeter.c +++ b/src/system/detail/src/mgmtMeter.c @@ -1094,9 +1094,12 @@ static SMetricMetaElemMsg *doConvertMetricMetaMsg(SMetricMetaMsg *pMetricMetaMsg pElem->groupbyTagColumnList = htonl(pElem->groupbyTagColumnList); - int16_t *groupColIds = (int16_t*) (((char *)pMetricMetaMsg) + pElem->groupbyTagColumnList); + SColIndexEx *groupColIds = (SColIndexEx*) (((char *)pMetricMetaMsg) + pElem->groupbyTagColumnList); for (int32_t i = 0; i < pElem->numOfGroupCols; ++i) { - groupColIds[i] = htons(groupColIds[i]); + groupColIds[i].colId = htons(groupColIds[i].colId); + groupColIds[i].colIdx = htons(groupColIds[i].colIdx); + groupColIds[i].flag = htons(groupColIds[i].flag); + groupColIds[i].colIdxInBuf = 0; } return pElem; diff --git a/src/system/detail/src/mgmtSupertableQuery.c b/src/system/detail/src/mgmtSupertableQuery.c index b56173f14f2ade67920962d2a7fb1995029f2952..10a64408ce558df441ea215b3e84cb6698b512b2 100644 --- a/src/system/detail/src/mgmtSupertableQuery.c +++ b/src/system/detail/src/mgmtSupertableQuery.c @@ -784,14 +784,15 @@ int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQuer } // todo refactor!!!!! -static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, void* param) { +static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, int32_t len, char* param) { if (offset == TSDB_TBNAME_COLUMN_INDEX) { extractMeterName(pMeter->meterId, param); - return param; } else { - char* tags = pMeter->pTagData + TSDB_METER_ID_LEN; // tag start position - return (tags + offset); + char* tags = pMeter->pTagData + offset + TSDB_METER_ID_LEN; // tag start position + memcpy(param, tags, len); // make sure the value is null-terminated string } + + return param; } bool tSkipListNodeFilterCallback(const void* pNode, void* param) { @@ -799,8 +800,9 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) { tQueryInfo* pInfo = (tQueryInfo*)param; STabObj* pMeter = (STabObj*)(((tSkipListNode*)pNode)->pData); - char name[TSDB_METER_NAME_LEN + 1] = {0}; - char* val = getTagValueFromMeter(pMeter, pInfo->offset, name); + char buf[TSDB_MAX_TAGS_LEN] = {0}; + + char* val = getTagValueFromMeter(pMeter, pInfo->offset, pInfo->sch.bytes, buf); int8_t type = pInfo->sch.type; int32_t ret = 0; diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 6fa0891baa4f49cf5e5cb3ad74095aabd94bc94e..a545645a0967d5f2f12ebfe412cd884cc8fb24f2 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -896,6 +896,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { pSupporter->pMeterDataInfo = (SMeterDataInfo *)calloc(1, sizeof(SMeterDataInfo) * pSupporter->numOfMeters); if (pSupporter->pMeterDataInfo == NULL) { dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno)); + pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; return; } @@ -1172,12 +1173,13 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { } if (pQInfo->killed) { - TSDB_QINFO_RESET_SIG(pQInfo); - dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); + dTrace("QInfo:%p it is already killed, abort", pQInfo); + vnodeDecRefCount(pQInfo); + return; } - assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); + assert(pQInfo->refCount >= 1); SQuery * pQuery = &pQInfo->query; SMeterObj *pMeterObj = pQInfo->pObj; @@ -1211,10 +1213,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); - dTrace("QInfo:%p reset signature", pQInfo); - sem_post(&pQInfo->dataReady); - TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); return; } @@ -1233,23 +1233,22 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); - dTrace("QInfo:%p reset signature", pQInfo); - sem_post(&pQInfo->dataReady); - TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); + return; } } } pQInfo->over = 1; - dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned, reset signature", pQInfo, + dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead); vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter); sem_post(&pQInfo->dataReady); - TSDB_QINFO_RESET_SIG(pQInfo); - + + vnodeDecRefCount(pQInfo); return; } @@ -1277,15 +1276,15 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { /* check if query is killed or not */ if (isQueryKilled(pQuery)) { - dTrace("QInfo:%p query is killed, reset signature", pQInfo); + dTrace("QInfo:%p query is killed", pQInfo); pQInfo->over = 1; } else { - dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned, reset signature", + dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); } sem_post(&pQInfo->dataReady); - TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); } void vnodeMultiMeterQuery(SSchedMsg *pMsg) { @@ -1296,12 +1295,12 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { } if (pQInfo->killed) { - TSDB_QINFO_RESET_SIG(pQInfo); - dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); + vnodeDecRefCount(pQInfo); + dTrace("QInfo:%p it is already killed, abort", pQInfo); return; } - assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); + assert(pQInfo->refCount >= 1); SQuery *pQuery = &pQInfo->query; pQuery->pointsRead = 0; @@ -1322,7 +1321,6 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { pQInfo->useconds += (taosGetTimestampUs() - st); pQInfo->over = isQueryKilled(pQuery) ? 1 : 0; - dTrace("QInfo:%p reset signature", pQInfo); taosInterpoSetStartInfo(&pQInfo->pMeterQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead, pQInfo->query.interpoType); @@ -1330,11 +1328,11 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { if (pQuery->pointsRead == 0) { pQInfo->over = 1; - dTrace("QInfo:%p over, %d meters queried, %d points are returned, reset signature", pQInfo, pSupporter->numOfMeters, + dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters, pQInfo->pointsRead); vnodePrintQueryStatistics(pSupporter); } sem_post(&pQInfo->dataReady); - TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); } diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 21e83c5198decf552d2a30fed486465f4a4402d2..ccd59f356b18eee22d8fe8afd1cf82da32b97d8e 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -392,10 +392,10 @@ __clean_memory: return NULL; } -static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) { - SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; - vnodeFreeQInfo(pQInfo, true); -} +//static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) { +// SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; +// vnodeFreeQInfo(pQInfo, true); +//} void vnodeFreeQInfoInQueue(void *param) { SQInfo *pQInfo = (SQInfo *)param; @@ -403,15 +403,18 @@ void vnodeFreeQInfoInQueue(void *param) { if (!vnodeIsQInfoValid(pQInfo)) return; pQInfo->killed = 1; - - dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo); - SSchedMsg schedMsg = {0}; - schedMsg.fp = vnodeFreeQInfoInQueueImpl; - - schedMsg.msg = NULL; - schedMsg.thandle = (void *)1; - schedMsg.ahandle = param; - taosScheduleTask(queryQhandle, &schedMsg); + dTrace("QInfo:%p set kill flag to free QInfo"); + + vnodeDecRefCount(pQInfo); + +// dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo); +// SSchedMsg schedMsg = {0}; +// schedMsg.fp = vnodeFreeQInfoInQueueImpl; + +// schedMsg.msg = NULL; +// schedMsg.thandle = (void *)1; +// schedMsg.ahandle = param; +// taosScheduleTask(queryQhandle, &schedMsg); } void vnodeFreeQInfo(void *param, bool decQueryRef) { @@ -419,8 +422,6 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) { if (!vnodeIsQInfoValid(param)) return; pQInfo->killed = 1; - TSDB_WAIT_TO_SAFE_DROP_QINFO(pQInfo); - SMeterObj *pObj = pQInfo->pObj; dTrace("QInfo:%p start to free SQInfo", pQInfo); @@ -499,7 +500,30 @@ bool vnodeIsQInfoValid(void *param) { * into local variable, then compare by using local variable */ uint64_t sig = pQInfo->signature; - return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG); + return (sig == (uint64_t)pQInfo); +} + +void vnodeDecRefCount(void *param) { + SQInfo *pQInfo = (SQInfo*) param; + + assert(vnodeIsQInfoValid(pQInfo)); + + int32_t ref = atomic_sub_fetch_32(&pQInfo->refCount, 1); + assert(ref >= 0); + + dTrace("QInfo:%p decrease obj refcount, %d", pQInfo, ref); + if (ref == 0) { + vnodeFreeQInfo(pQInfo, true); + } +} + +void vnodeAddRefCount(void *param) { + SQInfo *pQInfo = (SQInfo*) param; + + assert(vnodeIsQInfoValid(pQInfo)); + + int32_t ref = atomic_add_fetch_32(&pQInfo->refCount, 1); + dTrace("QInfo:%p add refcount, %d", pQInfo, ref); } void vnodeQueryData(SSchedMsg *pMsg) { @@ -509,12 +533,11 @@ void vnodeQueryData(SSchedMsg *pMsg) { pQInfo = (SQInfo *)pMsg->ahandle; if (pQInfo->killed) { - TSDB_QINFO_RESET_SIG(pQInfo); - dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); + dTrace("QInfo:%p it is already killed, abort", pQInfo); + vnodeDecRefCount(pQInfo); return; } - assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); pQuery = &(pQInfo->query); SMeterObj *pObj = pQInfo->pObj; @@ -582,13 +605,11 @@ void vnodeQueryData(SSchedMsg *pMsg) { tclose(pQInfo->query.lfd); } - /* reset QInfo signature */ - dTrace("QInfo:%p reset signature", pQInfo); - TSDB_QINFO_RESET_SIG(pQInfo); sem_post(&pQInfo->dataReady); + vnodeDecRefCount(pQInfo); } -void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, +void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, SQueryMeterMsg *pQueryMsg, int32_t *code) { SQInfo *pQInfo; SQuery *pQuery; @@ -659,6 +680,7 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp } if (pQInfo->over == 1) { + vnodeAddRefCount(pQInfo); // for retrieve procedure return pQInfo; } @@ -667,15 +689,20 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp schedMsg.fp = vnodeQueryData; } - // set in query flag - pQInfo->signature = TSDB_QINFO_QUERY_FLAG; - + /* + * The reference count, which is 2, is for both the current query thread and the future retrieve request, + * which will always be issued by client to acquire data or free SQInfo struct. + */ + vnodeAddRefCount(pQInfo); + vnodeAddRefCount(pQInfo); + schedMsg.msg = NULL; schedMsg.thandle = (void *)1; schedMsg.ahandle = pQInfo; - dTrace("QInfo:%p set query flag and prepare runtime environment completed, wait for schedule", pQInfo); - + dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, + pQInfo->refCount); + taosScheduleTask(queryQhandle, &schedMsg); return pQInfo; @@ -775,12 +802,13 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE goto _error; } + vnodeAddRefCount(pQInfo); if (pQInfo->over == 1) { return pQInfo; } - pQInfo->signature = TSDB_QINFO_QUERY_FLAG; - + vnodeAddRefCount(pQInfo); + schedMsg.msg = NULL; schedMsg.thandle = (void *)1; schedMsg.ahandle = pQInfo; @@ -860,21 +888,15 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) { pQInfo->pointsRead); if (pQInfo->over == 0) { - //dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__); - dTrace("QInfo:%p set query flag, oldSig:%p", pQInfo, pQInfo->signature); - uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo); - - /* - * If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo, - * since in release function, the original value has been destroyed. However, this memory area may be reused - * by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo. - */ - if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) { - dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature); + dTrace("QInfo:%p set query flag, sig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__); + + if (pQInfo->killed == 1) { + dTrace("%p freed or killed, abort query", pQInfo); } else { + vnodeAddRefCount(pQInfo); dTrace("%p add query into task queue for schedule", pQInfo); - - SSchedMsg schedMsg; + + SSchedMsg schedMsg = {0}; if (pQInfo->pMeterQuerySupporter != NULL) { if (pQInfo->pMeterQuerySupporter->pSidSet == NULL) { diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index cfdd9567683a55508bb32cc7a0ea208ccbddbe66..5efbb41014bb6e039ea4a1a581e8a3a121b37cc6 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -376,7 +376,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { if (QUERY_IS_STABLE_QUERY(pQueryMsg->queryType)) { pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); } else { - pObj->qhandle = vnodeQueryInTimeRange(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); + pObj->qhandle = vnodeQueryOnSingleTable(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); } _query_over: @@ -470,9 +470,11 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { pMsg += size; msgLen = pMsg - pStart; + assert(code != TSDB_CODE_ACTION_IN_PROGRESS); + if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); - vnodeFreeQInfo(pObj->qhandle, true); + vnodeDecRefCount(pObj->qhandle); pObj->qhandle = NULL; }