diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 68794701454281a6d10fda818b28a13a0f5d3e5a..4ccbdec443f6e8196855b97cf4fb6c2ee5e09348 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -344,6 +344,10 @@ typedef struct SSqlObj { struct SSqlObj *prev, *next; int64_t self; + + int64_t metaSubRid; + int64_t parentRid; + struct SSqlObj *metaSubPtr; } SSqlObj; typedef struct SSqlStream { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index fc1ed1a177d7d65f484744f6a53dc228ff20dcc5..c3be67929ab9db9894b5e7d8a765cbd11404a772 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -38,6 +38,11 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt); void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts); void tscSaveSubscriptionProgress(void* sub); +static void tscCheckpSql(SSqlObj* pSql) { + SSqlObj* parent = pSql->param; + assert(parent->metaSubPtr == pSql && parent->self == pSql->parentRid && parent->metaSubRid == pSql->self); +} + static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static int32_t getWaitingTimeInterval(int32_t count) { int32_t initial = 100; // 100 ms by default @@ -1765,6 +1770,8 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { int tscProcessTableMetaRsp(SSqlObj *pSql) { STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp; + tscCheckpSql(pSql); + pMetaMsg->tid = htonl(pMetaMsg->tid); pMetaMsg->sversion = htons(pMetaMsg->sversion); pMetaMsg->tversion = htons(pMetaMsg->tversion); @@ -1943,6 +1950,8 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { int tscProcessSTableVgroupRsp(SSqlObj *pSql) { SSqlRes* pRes = &pSql->res; + tscCheckpSql(pSql); + // NOTE: the order of several table must be preserved. SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp; pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables); @@ -2313,8 +2322,13 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn pNew->fp = tscTableMetaCallBack; pNew->param = pSql; + registerSqlObj(pNew); + pNew->parentRid = pSql->self; + pSql->metaSubRid = pNew->self; + pSql->metaSubPtr = pNew; + int32_t code = tscProcessSql(pNew); if (code == TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated @@ -2423,6 +2437,9 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { pNew->fp = tscTableMetaCallBack; pNew->param = pSql; + pNew->parentRid = pSql->self; + pSql->metaSubPtr = pNew; + pSql->metaSubRid = pNew->self; code = tscProcessSql(pNew); if (code == TSDB_CODE_SUCCESS) { code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;