提交 a5a3fb8f 编写于 作者: S slguan

Merge branch 'develop' into feature/slguan

...@@ -528,8 +528,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) { ...@@ -528,8 +528,7 @@ void tscFetchDatablockFromSubquery(SSqlObj* pSql) {
numOfFetch++; numOfFetch++;
} }
} else { } else {
if ((pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i])) && if (pRes->row >= pRes->numOfRows && (!tscHasReachLimitation(pSql->pSubs[i]))) {
tscProjectionQueryOnTable(&pSql->cmd)) || (pRes->numOfRows == 0)) {
numOfFetch++; numOfFetch++;
} }
} }
......
...@@ -1290,6 +1290,12 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu ...@@ -1290,6 +1290,12 @@ static SSqlObj *tscCreateSqlObjForSubquery(SSqlObj *pSql, SRetrieveSupport *trsu
SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj); SSqlObj *pNew = createSubqueryObj(pSql, 0, tscRetrieveDataRes, trsupport, prevSqlObj);
if (pNew != NULL) { // the sub query of two-stage super table query if (pNew != NULL) { // the sub query of two-stage super table query
pNew->cmd.type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY; pNew->cmd.type |= TSDB_QUERY_TYPE_STABLE_SUBQUERY;
assert(pNew->cmd.numOfTables == 1);
//launch subquery for each vnode, so the subquery index equals to the vnodeIndex.
SMeterMetaInfo* pMeterMetaInfo = tscGetMeterMetaInfo(&pNew->cmd, 0);
pMeterMetaInfo->vnodeIndex = trsupport->subqueryIndex;
pSql->pSubs[trsupport->subqueryIndex] = pNew; pSql->pSubs[trsupport->subqueryIndex] = pNew;
} }
...@@ -1507,7 +1513,6 @@ int tscBuildQueryMsg(SSqlObj *pSql) { ...@@ -1507,7 +1513,6 @@ int tscBuildQueryMsg(SSqlObj *pSql) {
pQueryMsg->uid = pMeterMeta->uid; pQueryMsg->uid = pMeterMeta->uid;
pQueryMsg->numOfTagsCols = 0; pQueryMsg->numOfTagsCols = 0;
} else { // query on metric } else { // query on metric
SMetricMeta *pMetricMeta = pMeterMetaInfo->pMetricMeta;
if (pMeterMetaInfo->vnodeIndex < 0) { if (pMeterMetaInfo->vnodeIndex < 0) {
tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex); tscError("%p error vnodeIdx:%d", pSql, pMeterMetaInfo->vnodeIndex);
return -1; return -1;
...@@ -2872,15 +2877,14 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) { ...@@ -2872,15 +2877,14 @@ int tscBuildMetricMetaMsg(SSqlObj *pSql) {
pElem->groupbyTagColumnList = htonl(offset); pElem->groupbyTagColumnList = htonl(offset);
for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) { for (int32_t j = 0; j < pCmd->groupbyExpr.numOfGroupCols; ++j) {
SColIndexEx *pCol = &pCmd->groupbyExpr.columnInfo[j]; SColIndexEx *pCol = &pCmd->groupbyExpr.columnInfo[j];
SColIndexEx* pDestCol = (SColIndexEx*) pMsg;
*((int16_t *)pMsg) = pCol->colId;
pMsg += sizeof(pCol->colId); pDestCol->colIdxInBuf = 0;
pDestCol->colIdx = htons(pCol->colIdx);
*((int16_t *)pMsg) += pCol->colIdx; pDestCol->colId = htons(pDestCol->colId);
pMsg += sizeof(pCol->colIdx); pDestCol->flag = htons(pDestCol->flag);
*((int16_t *)pMsg) += pCol->flag; pMsg += sizeof(SColIndexEx);
pMsg += sizeof(pCol->flag);
} }
} }
} }
......
...@@ -1706,12 +1706,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1706,12 +1706,10 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
} }
pNew->fp = fp; pNew->fp = fp;
pNew->param = param; pNew->param = param;
SMeterMetaInfo* pMetermetaInfo = tscGetMeterMetaInfo(pCmd, tableIndex);
char key[TSDB_MAX_TAGS_LEN + 1] = {0}; char key[TSDB_MAX_TAGS_LEN + 1] = {0};
tscGetMetricMetaCacheKey(pCmd, key, pMetermetaInfo->pMeterMeta->uid); tscGetMetricMetaCacheKey(pCmd, key, uid);
#ifdef _DEBUG_VIEW #ifdef _DEBUG_VIEW
printf("the metricmeta key is:%s\n", key); printf("the metricmeta key is:%s\n", key);
...@@ -1736,7 +1734,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void ...@@ -1736,7 +1734,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
} }
assert(pFinalInfo->pMeterMeta != NULL); assert(pFinalInfo->pMeterMeta != NULL);
if (UTIL_METER_IS_METRIC(pMetermetaInfo)) { if (UTIL_METER_IS_METRIC(pMeterMetaInfo)) {
assert(pFinalInfo->pMetricMeta != NULL); assert(pFinalInfo->pMetricMeta != NULL);
} }
......
...@@ -206,26 +206,6 @@ typedef struct { ...@@ -206,26 +206,6 @@ typedef struct {
char cont[]; char cont[];
} SVMsgHeader; } SVMsgHeader;
/*
* The value of QInfo.signature is used to denote that a query is executing, it isn't safe to release QInfo yet.
* The release operations will be blocked in a busy-waiting until the query operation reach a safepoint.
* Then it will reset the signature in a atomic operation, followed by release operation.
* Only the QInfo.signature == QInfo, this structure can be released safely.
*/
#define TSDB_QINFO_QUERY_FLAG 0x1
#define TSDB_QINFO_RESET_SIG(x) atomic_store_64(&((x)->signature), (uint64_t)(x))
#define TSDB_QINFO_SET_QUERY_FLAG(x) \
atomic_val_compare_exchange_64(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG);
// live lock: wait for query reaching a safe-point, release all resources
// belongs to this query
#define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \
{ \
while (atomic_val_compare_exchange_64(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \
taosMsleep(1); \
} \
}
struct tSQLBinaryExpr; struct tSQLBinaryExpr;
typedef struct SColumnInfoEx { typedef struct SColumnInfoEx {
...@@ -337,7 +317,7 @@ extern void * vnodeTmrCtrl; ...@@ -337,7 +317,7 @@ extern void * vnodeTmrCtrl;
// read API // read API
extern int (*vnodeSearchKeyFunc[])(char *pValue, int num, TSKEY key, int order); extern int (*vnodeSearchKeyFunc[])(char *pValue, int num, TSKEY key, int order);
void *vnodeQueryInTimeRange(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *sqlExprs, void *vnodeQueryOnSingleTable(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *sqlExprs,
SQueryMeterMsg *pQueryMsg, int *code); SQueryMeterMsg *pQueryMsg, int *code);
void *vnodeQueryOnMultiMeters(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, void *vnodeQueryOnMultiMeters(SMeterObj **pMeterObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
...@@ -370,6 +350,8 @@ void vnodeFreeQInfo(void *, bool); ...@@ -370,6 +350,8 @@ void vnodeFreeQInfo(void *, bool);
void vnodeFreeQInfoInQueue(void *param); void vnodeFreeQInfoInQueue(void *param);
bool vnodeIsQInfoValid(void *param); bool vnodeIsQInfoValid(void *param);
void vnodeDecRefCount(void *param);
void vnodeAddRefCount(void *param);
int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQuery); int32_t vnodeConvertQueryMeterMsg(SQueryMeterMsg *pQuery);
......
...@@ -52,7 +52,7 @@ typedef struct SQueryLoadCompBlockInfo { ...@@ -52,7 +52,7 @@ typedef struct SQueryLoadCompBlockInfo {
* the header file info for one vnode * the header file info for one vnode
*/ */
typedef struct SHeaderFileInfo { typedef struct SHeaderFileInfo {
int32_t fileID; // file id int32_t fileID; // file id
} SHeaderFileInfo; } SHeaderFileInfo;
typedef struct SQueryCostSummary { typedef struct SQueryCostSummary {
...@@ -99,17 +99,17 @@ typedef struct SQueryFilesInfo { ...@@ -99,17 +99,17 @@ typedef struct SQueryFilesInfo {
uint32_t numOfFiles; // the total available number of files for this virtual node during query execution uint32_t numOfFiles; // the total available number of files for this virtual node during query execution
int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap. int32_t current; // the memory mapped header file, NOTE: only one header file can be mmap.
int32_t vnodeId; int32_t vnodeId;
int32_t headerFd; // header file fd int32_t headerFd; // header file fd
char* pHeaderFileData; // mmap header files char* pHeaderFileData; // mmap header files
int64_t headFileSize; int64_t headFileSize;
int32_t dataFd; int32_t dataFd;
int32_t lastFd; int32_t lastFd;
char headerFilePath[PATH_MAX]; // current opened header file name char headerFilePath[PATH_MAX]; // current opened header file name
char dataFilePath[PATH_MAX]; // current opened data file name char dataFilePath[PATH_MAX]; // current opened data file name
char lastFilePath[PATH_MAX]; // current opened last file path char lastFilePath[PATH_MAX]; // current opened last file path
char dbFilePathPrefix[PATH_MAX]; char dbFilePathPrefix[PATH_MAX];
} SQueryFilesInfo; } SQueryFilesInfo;
typedef struct RuntimeEnvironment { typedef struct RuntimeEnvironment {
...@@ -129,17 +129,17 @@ typedef struct RuntimeEnvironment { ...@@ -129,17 +129,17 @@ typedef struct RuntimeEnvironment {
SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */ SQueryLoadBlockInfo loadBlockInfo; /* record current block load information */
SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */ SQueryLoadCompBlockInfo loadCompBlockInfo; /* record current compblock information in SQuery */
SQueryFilesInfo vnodeFileInfo; SQueryFilesInfo vnodeFileInfo;
int16_t numOfRowsPerPage; int16_t numOfRowsPerPage;
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
int16_t scanFlag; // denotes reversed scan of data or not int16_t scanFlag; // denotes reversed scan of data or not
SInterpolationInfo interpoInfo; SInterpolationInfo interpoInfo;
SData** pInterpoBuf; SData** pInterpoBuf;
SOutputRes* pResult; // reference to SQuerySupporter->pResult SOutputRes* pResult; // reference to SQuerySupporter->pResult
void* hashList; void* hashList;
int32_t usedIndex; // assigned SOutputRes in list int32_t usedIndex; // assigned SOutputRes in list
STSBuf* pTSBuf; STSBuf* pTSBuf;
STSCursor cur; STSCursor cur;
SQueryCostSummary summary; SQueryCostSummary summary;
} SQueryRuntimeEnv; } SQueryRuntimeEnv;
/* intermediate result during multimeter query involves interval */ /* intermediate result during multimeter query involves interval */
...@@ -214,14 +214,12 @@ typedef struct SMeterQuerySupportObj { ...@@ -214,14 +214,12 @@ typedef struct SMeterQuerySupportObj {
SMeterDataInfo* pMeterDataInfo; SMeterDataInfo* pMeterDataInfo;
TSKEY* tsList; TSKEY* tsList;
int32_t tsNum;
} SMeterQuerySupportObj; } SMeterQuerySupportObj;
typedef struct _qinfo { typedef struct _qinfo {
uint64_t signature; uint64_t signature;
int32_t refCount; // QInfo reference count, when the value is 0, it can be released safely
char user[TSDB_METER_ID_LEN + 1]; char user[TSDB_METER_ID_LEN + 1];
char sql[TSDB_SHOW_SQL_LEN]; char sql[TSDB_SHOW_SQL_LEN];
uint8_t stream; uint8_t stream;
...@@ -231,24 +229,21 @@ typedef struct _qinfo { ...@@ -231,24 +229,21 @@ typedef struct _qinfo {
int64_t useconds; int64_t useconds;
int killed; int killed;
struct _qinfo *prev, *next; struct _qinfo *prev, *next;
SQuery query;
int num;
int totalPoints;
int pointsRead;
int pointsReturned;
int pointsInterpo;
int code;
char bufIndex;
char changed;
char over;
SMeterObj* pObj;
sem_t dataReady;
SQuery query;
int num;
int totalPoints;
int pointsRead;
int pointsReturned;
int pointsInterpo;
int code;
char bufIndex;
char changed;
char over;
SMeterObj* pObj;
int (*fp)(SMeterObj*, SQuery*);
sem_t dataReady;
SMeterQuerySupportObj* pMeterQuerySupporter; SMeterQuerySupportObj* pMeterQuerySupporter;
int (*fp)(SMeterObj*, SQuery*);
} SQInfo; } SQInfo;
int32_t vnodeQuerySingleMeterPrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj, int32_t vnodeQuerySingleMeterPrepare(SQInfo* pQInfo, SMeterObj* pMeterObj, SMeterQuerySupportObj* pSMultiMeterObj,
......
...@@ -1094,9 +1094,12 @@ static SMetricMetaElemMsg *doConvertMetricMetaMsg(SMetricMetaMsg *pMetricMetaMsg ...@@ -1094,9 +1094,12 @@ static SMetricMetaElemMsg *doConvertMetricMetaMsg(SMetricMetaMsg *pMetricMetaMsg
pElem->groupbyTagColumnList = htonl(pElem->groupbyTagColumnList); pElem->groupbyTagColumnList = htonl(pElem->groupbyTagColumnList);
int16_t *groupColIds = (int16_t*) (((char *)pMetricMetaMsg) + pElem->groupbyTagColumnList); SColIndexEx *groupColIds = (SColIndexEx*) (((char *)pMetricMetaMsg) + pElem->groupbyTagColumnList);
for (int32_t i = 0; i < pElem->numOfGroupCols; ++i) { for (int32_t i = 0; i < pElem->numOfGroupCols; ++i) {
groupColIds[i] = htons(groupColIds[i]); groupColIds[i].colId = htons(groupColIds[i].colId);
groupColIds[i].colIdx = htons(groupColIds[i].colIdx);
groupColIds[i].flag = htons(groupColIds[i].flag);
groupColIds[i].colIdxInBuf = 0;
} }
return pElem; return pElem;
......
...@@ -784,14 +784,15 @@ int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQuer ...@@ -784,14 +784,15 @@ int mgmtRetrieveMetersFromMetric(SMetricMetaMsg* pMsg, int32_t tableIndex, tQuer
} }
// todo refactor!!!!! // todo refactor!!!!!
static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, void* param) { static char* getTagValueFromMeter(STabObj* pMeter, int32_t offset, int32_t len, char* param) {
if (offset == TSDB_TBNAME_COLUMN_INDEX) { if (offset == TSDB_TBNAME_COLUMN_INDEX) {
extractMeterName(pMeter->meterId, param); extractMeterName(pMeter->meterId, param);
return param;
} else { } else {
char* tags = pMeter->pTagData + TSDB_METER_ID_LEN; // tag start position char* tags = pMeter->pTagData + offset + TSDB_METER_ID_LEN; // tag start position
return (tags + offset); memcpy(param, tags, len); // make sure the value is null-terminated string
} }
return param;
} }
bool tSkipListNodeFilterCallback(const void* pNode, void* param) { bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
...@@ -799,8 +800,9 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) { ...@@ -799,8 +800,9 @@ bool tSkipListNodeFilterCallback(const void* pNode, void* param) {
tQueryInfo* pInfo = (tQueryInfo*)param; tQueryInfo* pInfo = (tQueryInfo*)param;
STabObj* pMeter = (STabObj*)(((tSkipListNode*)pNode)->pData); STabObj* pMeter = (STabObj*)(((tSkipListNode*)pNode)->pData);
char name[TSDB_METER_NAME_LEN + 1] = {0}; char buf[TSDB_MAX_TAGS_LEN] = {0};
char* val = getTagValueFromMeter(pMeter, pInfo->offset, name);
char* val = getTagValueFromMeter(pMeter, pInfo->offset, pInfo->sch.bytes, buf);
int8_t type = pInfo->sch.type; int8_t type = pInfo->sch.type;
int32_t ret = 0; int32_t ret = 0;
......
...@@ -896,6 +896,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { ...@@ -896,6 +896,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) {
pSupporter->pMeterDataInfo = (SMeterDataInfo *)calloc(1, sizeof(SMeterDataInfo) * pSupporter->numOfMeters); pSupporter->pMeterDataInfo = (SMeterDataInfo *)calloc(1, sizeof(SMeterDataInfo) * pSupporter->numOfMeters);
if (pSupporter->pMeterDataInfo == NULL) { if (pSupporter->pMeterDataInfo == NULL) {
dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno)); dError("QInfo:%p failed to allocate memory, %s", pQInfo, strerror(errno));
pQInfo->code = -TSDB_CODE_SERV_OUT_OF_MEMORY;
return; return;
} }
...@@ -1172,12 +1173,13 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { ...@@ -1172,12 +1173,13 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
} }
if (pQInfo->killed) { if (pQInfo->killed) {
TSDB_QINFO_RESET_SIG(pQInfo); dTrace("QInfo:%p it is already killed, abort", pQInfo);
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); vnodeDecRefCount(pQInfo);
return; return;
} }
assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); assert(pQInfo->refCount >= 1);
SQuery * pQuery = &pQInfo->query; SQuery * pQuery = &pQInfo->query;
SMeterObj *pMeterObj = pQInfo->pObj; SMeterObj *pMeterObj = pQInfo->pObj;
...@@ -1211,10 +1213,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { ...@@ -1211,10 +1213,8 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo, pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, numOfInterpo,
pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned); pQInfo->pointsRead, pQInfo->pointsInterpo, pQInfo->pointsReturned);
dTrace("QInfo:%p reset signature", pQInfo);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo); vnodeDecRefCount(pQInfo);
return; return;
} }
...@@ -1233,23 +1233,22 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { ...@@ -1233,23 +1233,22 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead,
pQInfo->pointsInterpo, pQInfo->pointsReturned); pQInfo->pointsInterpo, pQInfo->pointsReturned);
dTrace("QInfo:%p reset signature", pQInfo);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo); vnodeDecRefCount(pQInfo);
return; return;
} }
} }
} }
pQInfo->over = 1; pQInfo->over = 1;
dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned, reset signature", pQInfo, dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo,
pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead); pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead);
vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter); vnodePrintQueryStatistics(pQInfo->pMeterQuerySupporter);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo);
vnodeDecRefCount(pQInfo);
return; return;
} }
...@@ -1277,15 +1276,15 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) { ...@@ -1277,15 +1276,15 @@ void vnodeSingleMeterQuery(SSchedMsg *pMsg) {
/* check if query is killed or not */ /* check if query is killed or not */
if (isQueryKilled(pQuery)) { if (isQueryKilled(pQuery)) {
dTrace("QInfo:%p query is killed, reset signature", pQInfo); dTrace("QInfo:%p query is killed", pQInfo);
pQInfo->over = 1; pQInfo->over = 1;
} else { } else {
dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned, reset signature", dTrace("QInfo:%p vid:%d sid:%d id:%s, meter query thread completed, %d points are returned",
pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead); pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead);
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo); vnodeDecRefCount(pQInfo);
} }
void vnodeMultiMeterQuery(SSchedMsg *pMsg) { void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
...@@ -1296,12 +1295,12 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { ...@@ -1296,12 +1295,12 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
} }
if (pQInfo->killed) { if (pQInfo->killed) {
TSDB_QINFO_RESET_SIG(pQInfo); vnodeDecRefCount(pQInfo);
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); dTrace("QInfo:%p it is already killed, abort", pQInfo);
return; return;
} }
assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG); assert(pQInfo->refCount >= 1);
SQuery *pQuery = &pQInfo->query; SQuery *pQuery = &pQInfo->query;
pQuery->pointsRead = 0; pQuery->pointsRead = 0;
...@@ -1322,7 +1321,6 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { ...@@ -1322,7 +1321,6 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
pQInfo->useconds += (taosGetTimestampUs() - st); pQInfo->useconds += (taosGetTimestampUs() - st);
pQInfo->over = isQueryKilled(pQuery) ? 1 : 0; pQInfo->over = isQueryKilled(pQuery) ? 1 : 0;
dTrace("QInfo:%p reset signature", pQInfo);
taosInterpoSetStartInfo(&pQInfo->pMeterQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead, taosInterpoSetStartInfo(&pQInfo->pMeterQuerySupporter->runtimeEnv.interpoInfo, pQuery->pointsRead,
pQInfo->query.interpoType); pQInfo->query.interpoType);
...@@ -1330,11 +1328,11 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) { ...@@ -1330,11 +1328,11 @@ void vnodeMultiMeterQuery(SSchedMsg *pMsg) {
if (pQuery->pointsRead == 0) { if (pQuery->pointsRead == 0) {
pQInfo->over = 1; pQInfo->over = 1;
dTrace("QInfo:%p over, %d meters queried, %d points are returned, reset signature", pQInfo, pSupporter->numOfMeters, dTrace("QInfo:%p over, %d meters queried, %d points are returned", pQInfo, pSupporter->numOfMeters,
pQInfo->pointsRead); pQInfo->pointsRead);
vnodePrintQueryStatistics(pSupporter); vnodePrintQueryStatistics(pSupporter);
} }
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
TSDB_QINFO_RESET_SIG(pQInfo); vnodeDecRefCount(pQInfo);
} }
...@@ -392,10 +392,10 @@ __clean_memory: ...@@ -392,10 +392,10 @@ __clean_memory:
return NULL; return NULL;
} }
static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) { //static void vnodeFreeQInfoInQueueImpl(SSchedMsg *pMsg) {
SQInfo *pQInfo = (SQInfo *)pMsg->ahandle; // SQInfo *pQInfo = (SQInfo *)pMsg->ahandle;
vnodeFreeQInfo(pQInfo, true); // vnodeFreeQInfo(pQInfo, true);
} //}
void vnodeFreeQInfoInQueue(void *param) { void vnodeFreeQInfoInQueue(void *param) {
SQInfo *pQInfo = (SQInfo *)param; SQInfo *pQInfo = (SQInfo *)param;
...@@ -403,15 +403,18 @@ void vnodeFreeQInfoInQueue(void *param) { ...@@ -403,15 +403,18 @@ void vnodeFreeQInfoInQueue(void *param) {
if (!vnodeIsQInfoValid(pQInfo)) return; if (!vnodeIsQInfoValid(pQInfo)) return;
pQInfo->killed = 1; pQInfo->killed = 1;
dTrace("QInfo:%p set kill flag to free QInfo");
dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo);
SSchedMsg schedMsg = {0}; vnodeDecRefCount(pQInfo);
schedMsg.fp = vnodeFreeQInfoInQueueImpl;
// dTrace("QInfo:%p set kill flag and add to queue, stop query ASAP", pQInfo);
schedMsg.msg = NULL; // SSchedMsg schedMsg = {0};
schedMsg.thandle = (void *)1; // schedMsg.fp = vnodeFreeQInfoInQueueImpl;
schedMsg.ahandle = param;
taosScheduleTask(queryQhandle, &schedMsg); // schedMsg.msg = NULL;
// schedMsg.thandle = (void *)1;
// schedMsg.ahandle = param;
// taosScheduleTask(queryQhandle, &schedMsg);
} }
void vnodeFreeQInfo(void *param, bool decQueryRef) { void vnodeFreeQInfo(void *param, bool decQueryRef) {
...@@ -419,8 +422,6 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) { ...@@ -419,8 +422,6 @@ void vnodeFreeQInfo(void *param, bool decQueryRef) {
if (!vnodeIsQInfoValid(param)) return; if (!vnodeIsQInfoValid(param)) return;
pQInfo->killed = 1; pQInfo->killed = 1;
TSDB_WAIT_TO_SAFE_DROP_QINFO(pQInfo);
SMeterObj *pObj = pQInfo->pObj; SMeterObj *pObj = pQInfo->pObj;
dTrace("QInfo:%p start to free SQInfo", pQInfo); dTrace("QInfo:%p start to free SQInfo", pQInfo);
...@@ -499,7 +500,30 @@ bool vnodeIsQInfoValid(void *param) { ...@@ -499,7 +500,30 @@ bool vnodeIsQInfoValid(void *param) {
* into local variable, then compare by using local variable * into local variable, then compare by using local variable
*/ */
uint64_t sig = pQInfo->signature; uint64_t sig = pQInfo->signature;
return (sig == (uint64_t)pQInfo) || (sig == TSDB_QINFO_QUERY_FLAG); return (sig == (uint64_t)pQInfo);
}
void vnodeDecRefCount(void *param) {
SQInfo *pQInfo = (SQInfo*) param;
assert(vnodeIsQInfoValid(pQInfo));
int32_t ref = atomic_sub_fetch_32(&pQInfo->refCount, 1);
assert(ref >= 0);
dTrace("QInfo:%p decrease obj refcount, %d", pQInfo, ref);
if (ref == 0) {
vnodeFreeQInfo(pQInfo, true);
}
}
void vnodeAddRefCount(void *param) {
SQInfo *pQInfo = (SQInfo*) param;
assert(vnodeIsQInfoValid(pQInfo));
int32_t ref = atomic_add_fetch_32(&pQInfo->refCount, 1);
dTrace("QInfo:%p add refcount, %d", pQInfo, ref);
} }
void vnodeQueryData(SSchedMsg *pMsg) { void vnodeQueryData(SSchedMsg *pMsg) {
...@@ -509,12 +533,11 @@ void vnodeQueryData(SSchedMsg *pMsg) { ...@@ -509,12 +533,11 @@ void vnodeQueryData(SSchedMsg *pMsg) {
pQInfo = (SQInfo *)pMsg->ahandle; pQInfo = (SQInfo *)pMsg->ahandle;
if (pQInfo->killed) { if (pQInfo->killed) {
TSDB_QINFO_RESET_SIG(pQInfo); dTrace("QInfo:%p it is already killed, abort", pQInfo);
dTrace("QInfo:%p it is already killed, reset signature and abort", pQInfo); vnodeDecRefCount(pQInfo);
return; return;
} }
assert(pQInfo->signature == TSDB_QINFO_QUERY_FLAG);
pQuery = &(pQInfo->query); pQuery = &(pQInfo->query);
SMeterObj *pObj = pQInfo->pObj; SMeterObj *pObj = pQInfo->pObj;
...@@ -582,13 +605,11 @@ void vnodeQueryData(SSchedMsg *pMsg) { ...@@ -582,13 +605,11 @@ void vnodeQueryData(SSchedMsg *pMsg) {
tclose(pQInfo->query.lfd); tclose(pQInfo->query.lfd);
} }
/* reset QInfo signature */
dTrace("QInfo:%p reset signature", pQInfo);
TSDB_QINFO_RESET_SIG(pQInfo);
sem_post(&pQInfo->dataReady); sem_post(&pQInfo->dataReady);
vnodeDecRefCount(pQInfo);
} }
void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs, void *vnodeQueryOnSingleTable(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExpr, SSqlFunctionExpr *pSqlExprs,
SQueryMeterMsg *pQueryMsg, int32_t *code) { SQueryMeterMsg *pQueryMsg, int32_t *code) {
SQInfo *pQInfo; SQInfo *pQInfo;
SQuery *pQuery; SQuery *pQuery;
...@@ -659,6 +680,7 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp ...@@ -659,6 +680,7 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp
} }
if (pQInfo->over == 1) { if (pQInfo->over == 1) {
vnodeAddRefCount(pQInfo); // for retrieve procedure
return pQInfo; return pQInfo;
} }
...@@ -667,15 +689,20 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp ...@@ -667,15 +689,20 @@ void *vnodeQueryInTimeRange(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyExp
schedMsg.fp = vnodeQueryData; schedMsg.fp = vnodeQueryData;
} }
// set in query flag /*
pQInfo->signature = TSDB_QINFO_QUERY_FLAG; * The reference count, which is 2, is for both the current query thread and the future retrieve request,
* which will always be issued by client to acquire data or free SQInfo struct.
*/
vnodeAddRefCount(pQInfo);
vnodeAddRefCount(pQInfo);
schedMsg.msg = NULL; schedMsg.msg = NULL;
schedMsg.thandle = (void *)1; schedMsg.thandle = (void *)1;
schedMsg.ahandle = pQInfo; schedMsg.ahandle = pQInfo;
dTrace("QInfo:%p set query flag and prepare runtime environment completed, wait for schedule", pQInfo); dTrace("QInfo:%p set query flag and prepare runtime environment completed, ref:%d, wait for schedule", pQInfo,
pQInfo->refCount);
taosScheduleTask(queryQhandle, &schedMsg); taosScheduleTask(queryQhandle, &schedMsg);
return pQInfo; return pQInfo;
...@@ -775,12 +802,13 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE ...@@ -775,12 +802,13 @@ void *vnodeQueryOnMultiMeters(SMeterObj **pMetersObj, SSqlGroupbyExpr *pGroupbyE
goto _error; goto _error;
} }
vnodeAddRefCount(pQInfo);
if (pQInfo->over == 1) { if (pQInfo->over == 1) {
return pQInfo; return pQInfo;
} }
pQInfo->signature = TSDB_QINFO_QUERY_FLAG; vnodeAddRefCount(pQInfo);
schedMsg.msg = NULL; schedMsg.msg = NULL;
schedMsg.thandle = (void *)1; schedMsg.thandle = (void *)1;
schedMsg.ahandle = pQInfo; schedMsg.ahandle = pQInfo;
...@@ -860,21 +888,15 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) { ...@@ -860,21 +888,15 @@ int vnodeSaveQueryResult(void *handle, char *data, int32_t *size) {
pQInfo->pointsRead); pQInfo->pointsRead);
if (pQInfo->over == 0) { if (pQInfo->over == 0) {
//dTrace("QInfo:%p set query flag, oldSig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__); dTrace("QInfo:%p set query flag, sig:%p, func:%s", pQInfo, pQInfo->signature, __FUNCTION__);
dTrace("QInfo:%p set query flag, oldSig:%p", pQInfo, pQInfo->signature);
uint64_t oldSignature = TSDB_QINFO_SET_QUERY_FLAG(pQInfo); if (pQInfo->killed == 1) {
dTrace("%p freed or killed, abort query", pQInfo);
/*
* If SQInfo has been released, the value of signature cannot be equalled to the address of pQInfo,
* since in release function, the original value has been destroyed. However, this memory area may be reused
* by another function. It may be 0 or any value, but it is rarely still be equalled to the address of SQInfo.
*/
if (oldSignature == 0 || oldSignature != (uint64_t)pQInfo) {
dTrace("%p freed or killed, old sig:%p abort query", pQInfo, oldSignature);
} else { } else {
vnodeAddRefCount(pQInfo);
dTrace("%p add query into task queue for schedule", pQInfo); dTrace("%p add query into task queue for schedule", pQInfo);
SSchedMsg schedMsg; SSchedMsg schedMsg = {0};
if (pQInfo->pMeterQuerySupporter != NULL) { if (pQInfo->pMeterQuerySupporter != NULL) {
if (pQInfo->pMeterQuerySupporter->pSidSet == NULL) { if (pQInfo->pMeterQuerySupporter->pSidSet == NULL) {
......
...@@ -376,7 +376,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) { ...@@ -376,7 +376,7 @@ int vnodeProcessQueryRequest(char *pMsg, int msgLen, SShellObj *pObj) {
if (QUERY_IS_STABLE_QUERY(pQueryMsg->queryType)) { if (QUERY_IS_STABLE_QUERY(pQueryMsg->queryType)) {
pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code);
} else { } else {
pObj->qhandle = vnodeQueryInTimeRange(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code); pObj->qhandle = vnodeQueryOnSingleTable(pMeterObjList, pGroupbyExpr, pExprs, pQueryMsg, &code);
} }
_query_over: _query_over:
...@@ -470,9 +470,11 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) { ...@@ -470,9 +470,11 @@ void vnodeExecuteRetrieveReq(SSchedMsg *pSched) {
pMsg += size; pMsg += size;
msgLen = pMsg - pStart; msgLen = pMsg - pStart;
assert(code != TSDB_CODE_ACTION_IN_PROGRESS);
if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) { if (numOfRows == 0 && (pRetrieve->qhandle == (uint64_t)pObj->qhandle) && (code != TSDB_CODE_ACTION_IN_PROGRESS)) {
dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code); dTrace("QInfo:%p %s free qhandle code:%d", pObj->qhandle, __FUNCTION__, code);
vnodeFreeQInfo(pObj->qhandle, true); vnodeDecRefCount(pObj->qhandle);
pObj->qhandle = NULL; pObj->qhandle = NULL;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册