diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index d246a8cd23dbc08055445d1169fda27c1760b0d0..eb3038a152b2894423d3194ec37a3b9a7b31b2ad 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -418,7 +418,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // single table query error need to be handled here. if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || - rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { + (rpcMsg->code == TSDB_CODE_QRY_INVALID_SCHEMA_VERSION) || + rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY )) { // 1. super table subquery // 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer @@ -508,18 +509,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { if (pRes->code == TSDB_CODE_SUCCESS && pCmd->command < TSDB_SQL_MAX && tscProcessMsgRsp[pCmd->command]) { rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql); - if (rpcMsg->code == TSDB_CODE_TSC_INVALID_SCHEMA_VERSION) { - pSql->res.code = rpcMsg->code; - tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry); - - ++pSql->retry; - if (pSql->retry > pSql->maxRetry) { - tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry); - } else { - pSql->retryReason = rpcMsg->code; - rpcMsg->code = tscRenewTableMeta(pSql); - } - } } bool shouldFree = tscShouldBeFreed(pSql); @@ -976,6 +965,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen); + pQueryMsg->schemaVersion = htons(pTableMeta->sversion); + pQueryMsg->tagVersion = htons(pTableMeta->tversion); // set column list ids size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); char *pMsg = (char *)(pQueryMsg->tableCols) + numOfCols * sizeof(SColumnInfo); @@ -2705,26 +2696,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) { tscResetForNextRetrieve(pRes); - int32_t sVersion = -1; - int32_t tVersion = -1; - if (pQueryAttr->extend == 1) { - STLV* tlv = (STLV*)(pRes->pRsp + sizeof(SQueryTableRsp)); - while (tlv->type != TLV_TYPE_END_MARK) { - switch (ntohs(tlv->type)) { - case TLV_TYPE_META_VERSION: - sVersion = ntohl(*(int32_t*)tlv->value); - tVersion = ntohl(*(int32_t*)(tlv->value + sizeof(int32_t))); - break; - } - tlv = (STLV*) ((char*)tlv + sizeof(STLV) + ntohl(tlv->len)); - } - } - STableMetaInfo* tableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0); - if (tableMetaInfo->pTableMeta->sversion < sVersion || - tableMetaInfo->pTableMeta->tversion < tVersion) { - return TSDB_CODE_TSC_INVALID_SCHEMA_VERSION; - } - tscDebug("0x%"PRIx64" query rsp received, qId:0x%"PRIx64, pSql->self, pRes->qId); return 0; diff --git a/src/inc/query.h b/src/inc/query.h index 5f05d66f8f41cbafc2435ff8a711514dc27970f0..0872e3dbaa517ded77dd758b30e69f273c13a580 100644 --- a/src/inc/query.h +++ b/src/inc/query.h @@ -28,8 +28,7 @@ typedef void* qinfo_t; * @param qinfo * @return */ -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId, - int32_t* schemaVersion, int32_t* tagVersion); +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId); /** diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 0eb2479447bc6ae65a0eef4e41aba54ea6f9b518..44192403972cd9dc54b3f2a965e1468595e17487 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -292,6 +292,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica") #define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error") #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition") +#define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version") // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired" diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 26ce551e397fccfe6eb378aa0de2de771dfae10f..bd173494430b84bf703d3df6ec24e991a56b35f4 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -523,6 +523,8 @@ typedef struct { int32_t udfNum; // number of udf function int32_t udfContentOffset; int32_t udfContentLen; + int16_t schemaVersion; + int16_t tagVersion; SColumnInfo tableCols[]; } SQueryTableMsg; diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index ae435efecd77a1e4a4a5ebff82a9280ad5976415..adf688feba4e45b08ae5e13805eae3962006ccb1 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -67,8 +67,7 @@ void freeParam(SQueryParam *param) { tfree(param->prevResult); } -int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t qId, - int32_t* schemaVersion, int32_t* tagVersion) { +int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t qId) { assert(pQueryMsg != NULL && tsdb != NULL); int32_t code = TSDB_CODE_SUCCESS; @@ -163,6 +162,13 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi assert(0); } + int16_t queryTagVersion = ntohs(pQueryMsg->tagVersion); + int16_t querySchemaVersion = ntohs(pQueryMsg->schemaVersion); + if (queryTagVersion < tableGroupInfo.tVersion || querySchemaVersion < tableGroupInfo.sVersion) { + code = TSDB_CODE_QRY_INVALID_SCHEMA_VERSION; + goto _over; + } + code = checkForQueryBuf(tableGroupInfo.numOfTables); if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort goto _over; @@ -187,8 +193,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pQInfo, ¶m, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL); - *schemaVersion = tableGroupInfo.sVersion; - *tagVersion = tableGroupInfo.tVersion; _over: if (param.pGroupbyExpr != NULL) { taosArrayDestroy(&(param.pGroupbyExpr->columnInfo)); diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 7fd74110cbb4aa101d179d16fc66769c133da246..53274eb84250a813bee09c03aef9def009ac0d6c 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -239,30 +239,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) { if (contLen != 0) { qinfo_t pQInfo = NULL; uint64_t qId = genQueryId(); - int32_t schemaVersion = -1; - int32_t tagVersion = -1; - code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId, &schemaVersion, &tagVersion); + code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId); size_t rspLen = sizeof(SQueryTableRsp); - rspLen += (sizeof(STLV) + sizeof(int32_t) + sizeof(int32_t)); - rspLen += sizeof(STLV); SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(rspLen); pRsp->code = code; pRsp->qId = 0; - pRsp->extend = 1; - STLV* tlv = (STLV*)((char*)(pRsp) + sizeof(SQueryTableRsp)); - tlv->type = htons(TLV_TYPE_META_VERSION); - tlv->len = htonl(sizeof(int32_t) + sizeof(int32_t)); - int32_t sVersion = htonl(schemaVersion); - int32_t tVersion = htonl(tagVersion); - memcpy(tlv->value, &sVersion, sizeof(int32_t)); - memcpy(tlv->value + sizeof(int32_t), &tVersion, sizeof(int32_t)); - - STLV* tlvEnd = (STLV*)((char*)tlv + sizeof(STLV) + ntohl(tlv->len)); - tlvEnd->type = htons(TLV_TYPE_END_MARK); - tlvEnd->len = 0; - pRet->len = rspLen; pRet->rsp = pRsp; int32_t vgId = pVnode->vgId;