提交 ed8f9345 编写于 作者: D dapan1121

add pSql parent/child check

上级 ee427b16
...@@ -344,6 +344,10 @@ typedef struct SSqlObj { ...@@ -344,6 +344,10 @@ typedef struct SSqlObj {
struct SSqlObj *prev, *next; struct SSqlObj *prev, *next;
int64_t self; int64_t self;
int64_t metaSubRid;
int64_t parentRid;
struct SSqlObj *metaSubPtr;
} SSqlObj; } SSqlObj;
typedef struct SSqlStream { typedef struct SSqlStream {
......
...@@ -38,6 +38,11 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt); ...@@ -38,6 +38,11 @@ TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts); void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
void tscSaveSubscriptionProgress(void* sub); void tscSaveSubscriptionProgress(void* sub);
static void tscCheckpSql(SSqlObj* pSql) {
SSqlObj* parent = pSql->param;
assert(parent->metaSubPtr == pSql && parent->self == pSql->parentRid && parent->metaSubRid == pSql->self);
}
static int32_t minMsgSize() { return tsRpcHeadSize + 100; } static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static int32_t getWaitingTimeInterval(int32_t count) { static int32_t getWaitingTimeInterval(int32_t count) {
int32_t initial = 100; // 100 ms by default int32_t initial = 100; // 100 ms by default
...@@ -1765,6 +1770,8 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) { ...@@ -1765,6 +1770,8 @@ int tscBuildHeartBeatMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
int tscProcessTableMetaRsp(SSqlObj *pSql) { int tscProcessTableMetaRsp(SSqlObj *pSql) {
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp; STableMetaMsg *pMetaMsg = (STableMetaMsg *)pSql->res.pRsp;
tscCheckpSql(pSql);
pMetaMsg->tid = htonl(pMetaMsg->tid); pMetaMsg->tid = htonl(pMetaMsg->tid);
pMetaMsg->sversion = htons(pMetaMsg->sversion); pMetaMsg->sversion = htons(pMetaMsg->sversion);
pMetaMsg->tversion = htons(pMetaMsg->tversion); pMetaMsg->tversion = htons(pMetaMsg->tversion);
...@@ -1943,6 +1950,8 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { ...@@ -1943,6 +1950,8 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
int tscProcessSTableVgroupRsp(SSqlObj *pSql) { int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
SSqlRes* pRes = &pSql->res; SSqlRes* pRes = &pSql->res;
tscCheckpSql(pSql);
// NOTE: the order of several table must be preserved. // NOTE: the order of several table must be preserved.
SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp; SSTableVgroupRspMsg *pStableVgroup = (SSTableVgroupRspMsg *)pRes->pRsp;
pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables); pStableVgroup->numOfTables = htonl(pStableVgroup->numOfTables);
...@@ -2313,8 +2322,13 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn ...@@ -2313,8 +2322,13 @@ static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaIn
pNew->fp = tscTableMetaCallBack; pNew->fp = tscTableMetaCallBack;
pNew->param = pSql; pNew->param = pSql;
registerSqlObj(pNew); registerSqlObj(pNew);
pNew->parentRid = pSql->self;
pSql->metaSubRid = pNew->self;
pSql->metaSubPtr = pNew;
int32_t code = tscProcessSql(pNew); int32_t code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; // notify upper application that current process need to be terminated
...@@ -2423,6 +2437,9 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) { ...@@ -2423,6 +2437,9 @@ int tscGetSTableVgroupInfo(SSqlObj *pSql, int32_t clauseIndex) {
pNew->fp = tscTableMetaCallBack; pNew->fp = tscTableMetaCallBack;
pNew->param = pSql; pNew->param = pSql;
pNew->parentRid = pSql->self;
pSql->metaSubPtr = pNew;
pSql->metaSubRid = pNew->self;
code = tscProcessSql(pNew); code = tscProcessSql(pNew);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_ACTION_IN_PROGRESS; code = TSDB_CODE_TSC_ACTION_IN_PROGRESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册