提交 a020b457 编写于 作者: S shenglian zhou

query rsp will carry sversion and tversion

上级 8dee6d40
...@@ -2705,6 +2705,13 @@ int tscProcessQueryRsp(SSqlObj *pSql) { ...@@ -2705,6 +2705,13 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
tscResetForNextRetrieve(pRes); tscResetForNextRetrieve(pRes);
tscDebug("0x%"PRIx64" query rsp received, qId:0x%"PRIx64, pSql->self, pRes->qId); tscDebug("0x%"PRIx64" query rsp received, qId:0x%"PRIx64, pSql->self, pRes->qId);
STableMetaInfo* tableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
if (tableMetaInfo->pTableMeta->sversion < pQueryAttr->sVersion ||
tableMetaInfo->pTableMeta->tversion < pQueryAttr->tVersion) {
return TSDB_CODE_TSC_INVALID_SCHEMA_VERSION;
}
return 0; return 0;
} }
......
...@@ -28,7 +28,8 @@ typedef void* qinfo_t; ...@@ -28,7 +28,8 @@ typedef void* qinfo_t;
* @param qinfo * @param qinfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId); int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId,
int32_t* schemaVersion, int32_t* tagVersion);
/** /**
......
...@@ -530,6 +530,8 @@ typedef struct { ...@@ -530,6 +530,8 @@ typedef struct {
int8_t extend; int8_t extend;
int32_t code; int32_t code;
union{uint64_t qhandle; uint64_t qId;}; // query handle union{uint64_t qhandle; uint64_t qId;}; // query handle
int32_t sVersion;
int32_t tVersion;
} SQueryTableRsp; } SQueryTableRsp;
// todo: the show handle should be replaced with id // todo: the show handle should be replaced with id
......
...@@ -67,7 +67,8 @@ void freeParam(SQueryParam *param) { ...@@ -67,7 +67,8 @@ void freeParam(SQueryParam *param) {
tfree(param->prevResult); tfree(param->prevResult);
} }
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t qId) { int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t qId,
int32_t* schemaVersion, int32_t* tagVersion) {
assert(pQueryMsg != NULL && tsdb != NULL); assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -186,6 +187,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi ...@@ -186,6 +187,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pQInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL); code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pQInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL);
*schemaVersion = tableGroupInfo.sVersion;
*tagVersion = tableGroupInfo.tVersion;
_over: _over:
if (param.pGroupbyExpr != NULL) { if (param.pGroupbyExpr != NULL) {
taosArrayDestroy(&(param.pGroupbyExpr->columnInfo)); taosArrayDestroy(&(param.pGroupbyExpr->columnInfo));
......
...@@ -239,12 +239,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { ...@@ -239,12 +239,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (contLen != 0) { if (contLen != 0) {
qinfo_t pQInfo = NULL; qinfo_t pQInfo = NULL;
uint64_t qId = genQueryId(); uint64_t qId = genQueryId();
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId); int32_t schemaVersion = -1;
int32_t tagVersion = -1;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId, &schemaVersion, &tagVersion);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = code;
pRsp->qId = 0; pRsp->qId = 0;
pRsp->sVersion = schemaVersion;
pRsp->tVersion = tagVersion;
pRet->len = sizeof(SQueryTableRsp); pRet->len = sizeof(SQueryTableRsp);
pRet->rsp = pRsp; pRet->rsp = pRsp;
int32_t vgId = pVnode->vgId; int32_t vgId = pVnode->vgId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册