提交 833cda17 编写于 作者: S shenglian zhou 提交者: shenglian zhou

[TD-11389]<fix>(query):pass sverion/tversion from vnode to client so that...

[TD-11389]<fix>(query):pass sverion/tversion from vnode to client so that client can clear meta when its meta is older
上级 812bef29
...@@ -307,6 +307,8 @@ typedef struct { ...@@ -307,6 +307,8 @@ typedef struct {
int32_t row; int32_t row;
int16_t numOfCols; int16_t numOfCols;
int16_t precision; int16_t precision;
int32_t sVersion;
int32_t tVersion;
bool completed; bool completed;
int32_t code; int32_t code;
char * data; char * data;
......
...@@ -2756,6 +2756,8 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { ...@@ -2756,6 +2756,8 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
pRes->numOfRows = htonl(pRetrieve->numOfRows); pRes->numOfRows = htonl(pRetrieve->numOfRows);
pRes->precision = htons(pRetrieve->precision); pRes->precision = htons(pRetrieve->precision);
pRes->sVersion = htonl(pRetrieve->sVersion);
pRes->tVersion = htonl(pRetrieve->tVersion);
pRes->offset = htobe64(pRetrieve->offset); pRes->offset = htobe64(pRetrieve->offset);
pRes->useconds = htobe64(pRetrieve->useconds); pRes->useconds = htobe64(pRetrieve->useconds);
pRes->completed = (pRetrieve->completed == 1); pRes->completed = (pRetrieve->completed == 1);
......
...@@ -1713,6 +1713,21 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -1713,6 +1713,21 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscFreeMetaSqlObj(&pSql->svgroupRid); tscFreeMetaSqlObj(&pSql->svgroupRid);
SSqlCmd* pCmd = &pSql->cmd; SSqlCmd* pCmd = &pSql->cmd;
bool clearCachedMeta = false;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo != NULL) {
STableMeta* pTableMeta = NULL;
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo != NULL) {
pTableMeta = pTableMetaInfo->pTableMeta;
}
if (pTableMeta != NULL) {
if (pTableMeta->sversion < pSql->res.sVersion || pTableMeta->tversion < pSql->res.tVersion) {
clearCachedMeta = true;
}
}
}
int32_t cmd = pCmd->command; int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_GLOBALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT || if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_GLOBALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) { cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
...@@ -1731,7 +1746,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { ...@@ -1731,7 +1746,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pSql->self = 0; pSql->self = 0;
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
tscResetSqlCmd(pCmd, false, pSql->self); tscResetSqlCmd(pCmd, clearCachedMeta, pSql->self);
tfree(pCmd->payload); tfree(pCmd->payload);
pCmd->allocSize = 0; pCmd->allocSize = 0;
......
...@@ -545,6 +545,8 @@ typedef struct SRetrieveTableRsp { ...@@ -545,6 +545,8 @@ typedef struct SRetrieveTableRsp {
int32_t numOfRows; int32_t numOfRows;
int8_t completed; // all results are returned to client int8_t completed; // all results are returned to client
int16_t precision; int16_t precision;
int32_t sVersion;
int32_t tVersion;
int64_t offset; // updated offset value for multi-vnode projection query int64_t offset; // updated offset value for multi-vnode projection query
int64_t useconds; int64_t useconds;
int8_t compressed; int8_t compressed;
......
...@@ -229,6 +229,8 @@ typedef struct { ...@@ -229,6 +229,8 @@ typedef struct {
uint32_t numOfTables; uint32_t numOfTables;
SArray *pGroupList; SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid SHashObj *map; // speedup acquire the tableQueryInfo by table uid
int32_t sVersion;
int32_t tVersion;
} STableGroupInfo; } STableGroupInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 16 #define TSDB_BLOCK_DIST_STEP_ROWS 16
......
...@@ -407,6 +407,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co ...@@ -407,6 +407,8 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
} }
(*pRsp)->precision = htons(pQueryAttr->precision); (*pRsp)->precision = htons(pQueryAttr->precision);
(*pRsp)->sVersion = htonl(pQueryAttr->tableGroupInfo.sVersion);
(*pRsp)->tVersion = htonl(pQueryAttr->tableGroupInfo.tVersion);
(*pRsp)->compressed = (int8_t)((tsCompressColData != -1) && checkNeedToCompressQueryCol(pQInfo)); (*pRsp)->compressed = (int8_t)((tsCompressColData != -1) && checkNeedToCompressQueryCol(pQInfo));
if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) { if (GET_NUM_OF_RESULTS(&(pQInfo->runtimeEnv)) > 0 && pQInfo->code == TSDB_CODE_SUCCESS) {
......
...@@ -3981,7 +3981,8 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, cons ...@@ -3981,7 +3981,8 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, cons
pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res); pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
pGroupInfo->sVersion = tsdbGetTableSchema(pTable)->version;
pGroupInfo->tVersion = pTagSchema->version;
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu", tsdb, tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu", tsdb,
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList)); pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
...@@ -4068,6 +4069,9 @@ int32_t tsdbGetOneTableGroup(STsdbRepo* tsdb, uint64_t uid, TSKEY startKey, STab ...@@ -4068,6 +4069,9 @@ int32_t tsdbGetOneTableGroup(STsdbRepo* tsdb, uint64_t uid, TSKEY startKey, STab
taosArrayPush(group, &info); taosArrayPush(group, &info);
taosArrayPush(pGroupInfo->pGroupList, &group); taosArrayPush(pGroupInfo->pGroupList, &group);
pGroupInfo->sVersion = tsdbGetTableSchema(pTable)->version;
pGroupInfo->tVersion = tsdbGetTableTagSchema(pTable)->version;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
_error: _error:
...@@ -4084,6 +4088,8 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl ...@@ -4084,6 +4088,8 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
int32_t sVersion = -1;
int32_t tVersion = -1;
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
STableIdInfo *id = taosArrayGet(pTableIdList, i); STableIdInfo *id = taosArrayGet(pTableIdList, i);
...@@ -4105,6 +4111,18 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl ...@@ -4105,6 +4111,18 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl
STableKeyInfo info = {.pTable = pTable, .lastKey = id->key}; STableKeyInfo info = {.pTable = pTable, .lastKey = id->key};
taosArrayPush(group, &info); taosArrayPush(group, &info);
if (sVersion == -1) {
sVersion = tsdbGetTableSchema(pTable)->version;
} else {
assert (sVersion == tsdbGetTableSchema(pTable)->version);
}
if (tVersion == -1) {
tVersion = tsdbGetTableTagSchema(pTable)->version;
} else {
assert (tVersion == tsdbGetTableTagSchema(pTable)->version);
}
} }
if (tsdbUnlockRepoMeta(tsdb) < 0) { if (tsdbUnlockRepoMeta(tsdb) < 0) {
...@@ -4119,6 +4137,9 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl ...@@ -4119,6 +4137,9 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl
taosArrayDestroy(&group); taosArrayDestroy(&group);
} }
pGroupInfo->sVersion = sVersion;
pGroupInfo->tVersion = tVersion;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册