diff --git a/src/system/detail/inc/vnode.h b/src/system/detail/inc/vnode.h index 6e366b9b7fbfb0f5b6ad4d9c64d520c358a85e39..4ac08484274ad1cfaacc027f9f7f484a98d1f8e9 100644 --- a/src/system/detail/inc/vnode.h +++ b/src/system/detail/inc/vnode.h @@ -213,18 +213,20 @@ typedef struct { * Only the QInfo.signature == QInfo, this structure can be released safely. */ #define TSDB_QINFO_QUERY_FLAG 0x1 -#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x)) -#define TSDB_QINFO_SET_QUERY_FLAG(x) \ - atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG); +//#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x)) +#define TSDB_QINFO_RESET_SIG(x) +#define TSDB_QINFO_SET_QUERY_FLAG(x) +//#define TSDB_QINFO_SET_QUERY_FLAG(x) \ +// atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG); // live lock: wait for query reaching a safe-point, release all resources // belongs to this query #define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \ - { \ - while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \ - taosMsleep(1); \ - } \ - } +// { \ +// while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \ +// taosMsleep(1); \ +// } \ +// } struct tSQLBinaryExpr; @@ -370,6 +372,8 @@ void vnodeFreeQInfo(void *, bool); void vnodeFreeQInfoInQueue(void *param); bool vnodeIsQInfoValid(void *param); +void vnodeDecRefCount(void *param); +void vnodeAddRefCount(void *param); int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQuery); diff --git a/src/system/detail/inc/vnodeRead.h b/src/system/detail/inc/vnodeRead.h index d059075142126545a84c9cd60c0b14e1b79c5d7a..3a1b6f7949e398eb547c3c6bfec79bc82869e1fb 100644 --- a/src/system/detail/inc/vnodeRead.h +++ b/src/system/detail/inc/vnodeRead.h @@ -52,7 +52,7 @@ typedef struct SQueryLoadCompBlockInfo { * the header file info for one vnode */ typedef struct SHeaderFileInfo { - int32_t fileID; // file id + int32_t fileID; // file id } SHeaderFileInfo; typedef struct SQueryCostSummary { @@ -99,17 +99,17 @@ typedef struct SQueryFilesInfo { uint32_t numOfFiles; // the total available number of files for this virtual node during query execution int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap. int32_t vnodeId; - - int32_t headerFd; // header file fd - char* pHeaderFileData; // mmap header files - int64_t headFileSize; - int32_t dataFd; - int32_t lastFd; - - char headerFilePath[PATH_MAX]; // current opened header file name - char dataFilePath[PATH_MAX]; // current opened data file name - char lastFilePath[PATH_MAX]; // current opened last file path - char dbFilePathPrefix[PATH_MAX]; + + int32_t headerFd; // header file fd + char* pHeaderFileData; // mmap header files + int64_t headFileSize; + int32_t dataFd; + int32_t lastFd; + + char headerFilePath[PATH_MAX]; // current opened header file name + char dataFilePath[PATH_MAX]; // current opened data file name + char lastFilePath[PATH_MAX]; // current opened last file path + char dbFilePathPrefix[PATH_MAX]; } SQueryFilesInfo; typedef struct RuntimeEnvironment { @@ -129,17 +129,17 @@ typedef struct RuntimeEnvironment { SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */ SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */ SQueryFilesInfo vnodeFileInfo; - int16_t numOfRowsPerPage; - int16_t offset[TSDB_MAX_COLUMNS]; - int16_t scanFlag; // denotes reversed scan of data or not - SInterpolationInfo interpoInfo; - SData** pInterpoBuf; - SOutputRes* pResult; // reference to SQuerySupporter->pResult - void* hashList; - int32_t usedIndex; // assigned SOutputRes in list - STSBuf* pTSBuf; - STSCursor cur; - SQueryCostSummary summary; + int16_t numOfRowsPerPage; + int16_t offset[TSDB_MAX_COLUMNS]; + int16_t scanFlag; // denotes reversed scan of data or not + SInterpolationInfo interpoInfo; + SData** pInterpoBuf; + SOutputRes* pResult; // reference to SQuerySupporter->pResult + void* hashList; + int32_t usedIndex; // assigned SOutputRes in list + STSBuf* pTSBuf; + STSCursor cur; + SQueryCostSummary summary; } SQueryRuntimeEnv; /* intermediate result during multimeter query involves interval */ @@ -214,14 +214,12 @@ typedef struct SMeterQuerySupportObj { SMeterDataInfo* pMeterDataInfo; - TSKEY* tsList; - int32_t tsNum; - + TSKEY* tsList; } SMeterQuerySupportObj; typedef struct _qinfo { - uint64_t signature; - + uint64_t signature; + int32_t refCount; // QInfo reference count, when the value is 0, it can be released safely char user[TSDB_METER_ID_LEN + 1]; char sql[TSDB_SHOW_SQL_LEN]; uint8_t stream; @@ -231,24 +229,21 @@ typedef struct _qinfo { int64_t useconds; int killed; struct _qinfo *prev, *next; + SQuery query; + int num; + int totalPoints; + int pointsRead; + int pointsReturned; + int pointsInterpo; + int code; + char bufIndex; + char changed; + char over; + SMeterObj* pObj; + sem_t dataReady; - SQuery query; - int num; - int totalPoints; - int pointsRead; - int pointsReturned; - int pointsInterpo; - int code; - char bufIndex; - char changed; - char over; - SMeterObj* pObj; - - int (*fp)(SMeterObj*, SQuery*); - - sem_t dataReady; SMeterQuerySupportObj* pMeterQuerySupporter; - + int (*fp)(SMeterObj*, SQuery*); } SQInfo; int32_t vnodeQuerySingleMeterPrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj, diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 6fa0891baa4f49cf5e5cb3ad74095aabd94bc94e..f39ca0dea1c63821f0cca023a4b9e77a23f64462 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -890,12 +890,16 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); + + vnodeDecRefCount(pQInfo); return; } pSupporter->pMeterDataInfo = (SMeterDataInfo *)calloc(1, sizeof(SMeterDataInfo) * pSupporter->numOfMeters); if (pSupporter->pMeterDataInfo == NULL) { dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno)); + pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY; + vnodeDecRefCount(pQInfo); return; } @@ -912,6 +916,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { // failed to save all intermediate results into disk, abort further query processing if (doCloseAllOpenedResults(pSupporter) != TSDB_CODE_SUCCESS) { dError("QInfo:%p failed to save intermediate results, abort further query processing", pQInfo); + vnodeDecRefCount(pQInfo); return; } @@ -919,6 +924,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { if (isQueryKilled(pQuery)) { dTrace("QInfo:%p query killed, abort", pQInfo); + vnodeDecRefCount(pQInfo); return; } @@ -940,6 +946,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { pQInfo->pointsRead += pQuery->pointsRead; dTrace("QInfo:%p points returned:%d, totalRead:%d totalReturn:%d", pQInfo, pQuery->pointsRead, pQInfo->pointsRead, pQInfo->pointsReturned); + vnodeDecRefCount(pQInfo); } /* @@ -1174,10 +1181,13 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { if (pQInfo->killed) { TSDB_QINFO_RESET_SIG(pQInfo); dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); + vnodeDecRefCount(pQInfo); + return; } - assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); + assert(pQInfo->refCount >= 1); +// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); SQuery * pQuery = &pQInfo->query; SMeterObj *pMeterObj = pQInfo->pObj; @@ -1215,6 +1225,7 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { sem_post(&pQInfo->dataReady); TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); return; } @@ -1237,6 +1248,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { sem_post(&pQInfo->dataReady); TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); + return; } } @@ -1249,7 +1262,7 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter); sem_post(&pQInfo->dataReady); TSDB_QINFO_RESET_SIG(pQInfo); - + vnodeDecRefCount(pQInfo); return; } @@ -1284,8 +1297,10 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); } - sem_post(&pQInfo->dataReady); TSDB_QINFO_RESET_SIG(pQInfo); + sem_post(&pQInfo->dataReady); + + vnodeDecRefCount(pQInfo); } void vnodeMultiMeterQuery(SSchedMsg *pMsg) { @@ -1297,11 +1312,13 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { if (pQInfo->killed) { TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); return; } - assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); + assert(pQInfo->refCount >= 1); +// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); SQuery *pQuery = &pQInfo->query; pQuery->pointsRead = 0; @@ -1337,4 +1354,5 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { sem_post(&pQInfo->dataReady); TSDB_QINFO_RESET_SIG(pQInfo); + vnodeDecRefCount(pQInfo); } diff --git a/src/system/detail/src/vnodeRead.c b/src/system/detail/src/vnodeRead.c index 21e83c5198decf552d2a30fed486465f4a4402d2..2ea41d9d58cad91d8b19fa48a0904b9bb1365a77 100644 --- a/src/system/detail/src/vnodeRead.c +++ b/src/system/detail/src/vnodeRead.c @@ -392,10 +392,10 @@ __clean_memory: return NULL; } -static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) { - SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; - vnodeFreeQInfo(pQInfo, true); -} +//static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) { +// SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; +// vnodeFreeQInfo(pQInfo, true); +//} void vnodeFreeQInfoInQueue(void *param) { SQInfo *pQInfo = (SQInfo *)param; @@ -403,15 +403,18 @@ void vnodeFreeQInfoInQueue(void *param) { if (!vnodeIsQInfoValid(pQInfo)) return; pQInfo->killed = 1; - - dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo); - SSchedMsg schedMsg = {0}; - schedMsg.fp = vnodeFreeQInfoInQueueImpl; - - schedMsg.msg = NULL; - schedMsg.thandle = (void *)1; - schedMsg.ahandle = param; - taosScheduleTask(queryQhandle, &schedMsg); + dTrace("QInfo:%p set kill flag to free QInfo"); + + vnodeDecRefCount(pQInfo); + +// dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo); +// SSchedMsg schedMsg = {0}; +// schedMsg.fp = vnodeFreeQInfoInQueueImpl; + +// schedMsg.msg = NULL; +// schedMsg.thandle = (void *)1; +// schedMsg.ahandle = param; +// taosScheduleTask(queryQhandle, &schedMsg); } void vnodeFreeQInfo(void *param, bool decQueryRef) { @@ -499,7 +502,30 @@ bool vnodeIsQInfoValid(void *param) { * into local variable, then compare by using local variable */ uint64_t sig = pQInfo->signature; - return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG); +// return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG); + return (sig == (uint64_t)pQInfo); +} + +void vnodeDecRefCount(void *param) { + SQInfo *pQInfo = (SQInfo*) param; + + assert(vnodeIsQInfoValid(pQInfo)); + + int32_t ref = atomic_sub_fetch_32(&pQInfo->refCount, 1); + dTrace("QInfo:%p decrease obj refcount, %d", pQInfo, ref); + + if (ref == 0) { + vnodeFreeQInfo(pQInfo, true); + } +} + +void vnodeAddRefCount(void *param) { + SQInfo *pQInfo = (SQInfo*) param; + + assert(vnodeIsQInfoValid(pQInfo)); + + int32_t ref = atomic_add_fetch_32(&pQInfo->refCount, 1); + dTrace("QInfo:%p add refcount, %d", pQInfo, ref); } void vnodeQueryData(SSchedMsg *pMsg) { @@ -511,10 +537,11 @@ void vnodeQueryData(SSchedMsg *pMsg) { if (pQInfo->killed) { TSDB_QINFO_RESET_SIG(pQInfo); dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); + vnodeDecRefCount(pQInfo); return; } - assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); +// assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); pQuery = &(pQInfo->query); SMeterObj *pObj = pQInfo->pObj; @@ -586,6 +613,7 @@ void vnodeQueryData(SSchedMsg *pMsg) { dTrace("QInfo:%p reset signature", pQInfo); TSDB_QINFO_RESET_SIG(pQInfo); sem_post(&pQInfo->dataReady); + vnodeDecRefCount(pQInfo); } void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, @@ -668,14 +696,22 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp } // set in query flag - pQInfo->signature = TSDB_QINFO_QUERY_FLAG; - +// pQInfo->signature = TSDB_QINFO_QUERY_FLAG; + + /* + * The reference count, which is 2, is for both the current query thread and the future retrieve request, + * which will always be issued by client to acquire data or free SQInfo struct. + */ + vnodeAddRefCount(pQInfo); + vnodeAddRefCount(pQInfo); + schedMsg.msg = NULL; schedMsg.thandle = (void *)1; schedMsg.ahandle = pQInfo; - dTrace("QInfo:%p set query flag and prepare runtime environment completed, wait for schedule", pQInfo); - + dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo, + pQInfo->refCount); + taosScheduleTask(queryQhandle, &schedMsg); return pQInfo; @@ -779,8 +815,10 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE return pQInfo; } - pQInfo->signature = TSDB_QINFO_QUERY_FLAG; - +// pQInfo->signature = TSDB_QINFO_QUERY_FLAG; + vnodeAddRefCount(pQInfo); + vnodeAddRefCount(pQInfo); + schedMsg.msg = NULL; schedMsg.thandle = (void *)1; schedMsg.ahandle = pQInfo; @@ -862,19 +900,22 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) { if (pQInfo->over == 0) { //dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__); dTrace("QInfo:%p set query flag, oldSig:%p", pQInfo, pQInfo->signature); - uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo); +// uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo); /* * If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo, * since in release function, the original value has been destroyed. However, this memory area may be reused * by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo. */ - if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) { - dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature); +// if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) { + if (pQInfo->killed == 1) { +// dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature); + dTrace("%p freed or killed, abort query", pQInfo); } else { + vnodeAddRefCount(pQInfo); dTrace("%p add query into task queue for schedule", pQInfo); - - SSchedMsg schedMsg; + + SSchedMsg schedMsg = {0}; if (pQInfo->pMeterQuerySupporter != NULL) { if (pQInfo->pMeterQuerySupporter->pSidSet == NULL) { diff --git a/src/system/detail/src/vnodeShell.c b/src/system/detail/src/vnodeShell.c index cfdd9567683a55508bb32cc7a0ea208ccbddbe66..6ff3f4d27ca3e238574eaefcd7751f8e0ce71726 100644 --- a/src/system/detail/src/vnodeShell.c +++ b/src/system/detail/src/vnodeShell.c @@ -470,9 +470,11 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { pMsg += size; msgLen = pMsg - pStart; + assert(code != TSDB_CODE_ACTION_IN_PROGRESS); + if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); - vnodeFreeQInfo(pObj->qhandle, true); + vnodeDecRefCount(pObj->qhandle); pObj->qhandle = NULL; }