diff --git a/src/client/inc/tscSubquery.h b/src/client/inc/tscSubquery.h index d3996ccf7fe85a6405addfcbef023aa2ae6bf08b..43c9b009bfdc537b033b7cffbe00bb673c627847 100644 --- a/src/client/inc/tscSubquery.h +++ b/src/client/inc/tscSubquery.h @@ -43,6 +43,11 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql); char *getArithmeticInputSrc(void *param, const char *name, int32_t colId); +void tscLockByThread(int64_t *lockedBy); + +void tscUnlockByThread(int64_t *lockedBy); + + #ifdef __cplusplus } #endif diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 25a299d0988144980dded10b170cbe7b7c662b04..ecef3792fad59c3530c36c3912e2a4730565c4b3 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -347,6 +347,11 @@ typedef struct SSqlObj { SSubqueryState subState; struct SSqlObj **pSubs; + int64_t metaRid; + int64_t svgroupRid; + + int64_t squeryLock; + struct SSqlObj *prev, *next; int64_t self; } SSqlObj; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index 41e935441fd11428420efc98dc53616b7d2317dc..91ac52adbbe629cdd0836e24a887fba14c647142 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -402,8 +402,10 @@ void tscAsyncResultOnError(SSqlObj *pSql) { int tscSendMsgToServer(SSqlObj *pSql); void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { - SSqlObj *pSql = (SSqlObj *)param; - if (pSql == NULL || pSql->signature != pSql) return; + SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param); + if (pSql == NULL) return; + + assert(pSql->signature == pSql && (int64_t)param == pSql->self); SSqlCmd *pCmd = &pSql->cmd; SSqlRes *pRes = &pSql->res; @@ -428,7 +430,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { code = tscGetTableMeta(pSql, pTableMetaInfo); assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS); - if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosReleaseRef(tscObjRef, pSql->self); return; } @@ -436,6 +439,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { // tscProcessSql can add error into async res tscProcessSql(pSql); + taosReleaseRef(tscObjRef, pSql->self); return; } else { // continue to process normal async query if (pCmd->parseFinished) { @@ -446,6 +450,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosReleaseRef(tscObjRef, pSql->self); return; } @@ -458,6 +463,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { code = tsParseSql(pSql, true); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosReleaseRef(tscObjRef, pSql->self); return; } else if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -468,12 +474,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { tscProcessSql(pSql); } + taosReleaseRef(tscObjRef, pSql->self); return; } else { tscDebug("%p continue parse sql after get table meta", pSql); code = tsParseSql(pSql, false); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosReleaseRef(tscObjRef, pSql->self); return; } else if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -483,12 +491,14 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0); code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosReleaseRef(tscObjRef, pSql->self); return; } else { assert(code == TSDB_CODE_SUCCESS); } (*pSql->fp)(pSql->param, pSql, code); + taosReleaseRef(tscObjRef, pSql->self); return; } @@ -501,6 +511,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { code = tscGetTableMeta(pSql, pTableMetaInfo); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosReleaseRef(tscObjRef, pSql->self); return; } else if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -509,6 +520,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex); if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + taosReleaseRef(tscObjRef, pSql->self); return; } else if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -522,10 +534,15 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { (*pSql->fp)(pSql->param, pSql, code); + taosReleaseRef(tscObjRef, pSql->self); + return; } tscDoQuery(pSql); + + taosReleaseRef(tscObjRef, pSql->self); + return; _error: @@ -533,4 +550,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) { pSql->res.code = code; tscAsyncResultOnError(pSql); } + + taosReleaseRef(tscObjRef, pSql->self); } diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 4949aa9b9dab6efb96f411f433ec420c6fd6d338..e72042e7ab9ef36e17c57977585e49acb7f7334a 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -423,7 +423,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it taosRemoveRef(tscObjRef, pSql->self); - tscDebug("%p sqlObj is automatically freed", pSql); + tscDebug("%p sqlObj is automatically freed", pSql); } rpcFreeCont(rpcMsg->pCont); @@ -1992,16 +1992,20 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { } int tscProcessSTableVgroupRsp(SSqlObj *pSql) { + // master sqlObj locates in param + SSqlObj* parent = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)pSql->param); + if(parent == NULL) { + return pSql->res.code; + } + + assert(parent->signature == parent && (int64_t)pSql->param == parent->self); + SSqlRes* pRes = &pSql->res; // NOTE: the order of several table must be preserved. SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp; pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables); char *pMsg = pRes->pRsp + sizeof(SSTableVgroupRspMsg); - - // master sqlObj locates in param - SSqlObj* parent = pSql->param; - assert(parent != NULL); SSqlCmd* pCmd = &parent->cmd; for(int32_t i = 0; i < pStableVgroup->numOfTables; ++i) { @@ -2035,6 +2039,8 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) { pMsg += size; } + + taosReleaseRef(tscObjRef, parent->self); return pSql->res.code; } @@ -2328,10 +2334,14 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn tscDebug("%p new pSqlObj:%p to get tableMeta, auto create:%d", pSql, pNew, pNew->cmd.autoCreated); + registerSqlObj(pNew); + pNew->fp = tscTableMetaCallBack; - pNew->param = pSql; + pNew->param = (void *)pSql->self; - registerSqlObj(pNew); + tscDebug("%p metaRid from %" PRId64 " to %" PRId64 , pSql, pSql->metaRid, pNew->self); + + pSql->metaRid = pNew->self; int32_t code = tscProcessSql(pNew); if (code == TSDB_CODE_SUCCESS) { @@ -2348,6 +2358,7 @@ int32_t tscGetTableMeta(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) { uint32_t size = tscGetTableMetaMaxSize(); pTableMetaInfo->pTableMeta = calloc(1, size); + pTableMetaInfo->pTableMeta->tableType = -1; pTableMetaInfo->pTableMeta->tableInfo.numOfColumns = -1; int32_t len = (int32_t) strlen(pTableMetaInfo->name); @@ -2447,10 +2458,15 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { pNewQueryInfo->numOfTables = pQueryInfo->numOfTables; registerSqlObj(pNew); + tscDebug("%p svgroupRid from %" PRId64 " to %" PRId64 , pSql, pSql->svgroupRid, pNew->self); + + pSql->svgroupRid = pNew->self; + + tscDebug("%p new sqlObj:%p to get vgroupInfo, numOfTables:%d", pSql, pNew, pNewQueryInfo->numOfTables); pNew->fp = tscTableMetaCallBack; - pNew->param = pSql; + pNew->param = (void *)pSql->self; code = tscProcessSql(pNew); if (code == TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index 377cb24b1d5b1c92c702cd113c4feea185617911..4f5243b15a7ac0dc809d631790a408d0f01853ca 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -694,6 +694,8 @@ static void tscKillSTableQuery(SSqlObj *pSql) { // set the master sqlObj flag to cancel query pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + tscLockByThread(&pSql->squeryLock); + for (int i = 0; i < pSql->subState.numOfSub; ++i) { // NOTE: pSub may have been released already here SSqlObj *pSub = pSql->pSubs[i]; @@ -713,6 +715,12 @@ static void tscKillSTableQuery(SSqlObj *pSql) { taosReleaseRef(tscObjRef, pSubObj->self); } + if (pSql->subState.numOfSub <= 0) { + tscAsyncResultOnError(pSql); + } + + tscUnlockByThread(&pSql->squeryLock); + tscDebug("%p super table query cancelled", pSql); } diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c index 96fdde4fd61958efbf306a3b578a4b357a3ea930..76e3456b7e232d3ac0884b43755da989c85f9ffb 100644 --- a/src/client/src/tscSubquery.c +++ b/src/client/src/tscSubquery.c @@ -533,7 +533,7 @@ static void quitAllSubquery(SSqlObj* pSqlObj, SJoinSupporter* pSupporter) { freeJoinSubqueryObj(pSqlObj); } - tscDestroyJoinSupporter(pSupporter); + //tscDestroyJoinSupporter(pSupporter); } // update the query time range according to the join results on timestamp @@ -1658,6 +1658,25 @@ static void doCleanupSubqueries(SSqlObj *pSql, int32_t numOfSubs) { } } +void tscLockByThread(int64_t *lockedBy) { + int64_t tid = taosGetSelfPthreadId(); + int i = 0; + while (atomic_val_compare_exchange_64(lockedBy, 0, tid) != 0) { + if (++i % 100 == 0) { + sched_yield(); + } + } +} + +void tscUnlockByThread(int64_t *lockedBy) { + int64_t tid = taosGetSelfPthreadId(); + if (atomic_val_compare_exchange_64(lockedBy, tid, 0) != tid) { + assert(false); + } +} + + + int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) { SSqlRes *pRes = &pSql->res; SSqlCmd *pCmd = &pSql->cmd; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 5d818692ed90480c4b461c1fe8aba9d785dee694..7774ad348c895f4d0a8eefac8818dafb60de7241 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -467,6 +467,18 @@ void tscFreeRegisteredSqlObj(void *pSql) { } +void tscFreeMetaSqlObj(int64_t *rid){ + if (RID_VALID(*rid)) { + SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, *rid); + if (pSql) { + taosRemoveRef(tscObjRef, *rid); + taosReleaseRef(tscObjRef, *rid); + } + + *rid = 0; + } +} + void tscFreeSqlObj(SSqlObj* pSql) { if (pSql == NULL || pSql->signature != pSql) { return; @@ -476,6 +488,9 @@ void tscFreeSqlObj(SSqlObj* pSql) { pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED; + tscFreeMetaSqlObj(&pSql->metaRid); + tscFreeMetaSqlObj(&pSql->svgroupRid); + tscFreeSubobj(pSql); SSqlCmd* pCmd = &pSql->cmd; @@ -504,6 +519,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { pCmd->allocSize = 0; tsem_destroy(&pSql->rspSem); + memset(pSql, 0, sizeof(*pSql)); free(pSql); } @@ -2192,7 +2208,9 @@ void tscDoQuery(SSqlObj* pSql) { tscProcessSql(pSql); } else { // secondary stage join query. if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query + tscLockByThread(&pSql->squeryLock); tscHandleMasterSTableQuery(pSql); + tscUnlockByThread(&pSql->squeryLock); } else { tscProcessSql(pSql); } @@ -2201,7 +2219,9 @@ void tscDoQuery(SSqlObj* pSql) { return; } else if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { // super table query + tscLockByThread(&pSql->squeryLock); tscHandleMasterSTableQuery(pSql); + tscUnlockByThread(&pSql->squeryLock); return; }