提交 e4fa0791 编写于 作者: S shenglian zhou

(query):return invalid version when taosd meet lower table schema version/tag version

上级 04e4f8cf
......@@ -418,7 +418,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// single table query error need to be handled here.
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
(((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
(rpcMsg->code == TSDB_CODE_QRY_INVALID_SCHEMA_VERSION) ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY )) {
// 1. super table subquery
// 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
......@@ -508,18 +509,6 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
if (pRes->code == TSDB_CODE_SUCCESS && pCmd->command < TSDB_SQL_MAX && tscProcessMsgRsp[pCmd->command]) {
rpcMsg->code = (*tscProcessMsgRsp[pCmd->command])(pSql);
if (rpcMsg->code == TSDB_CODE_TSC_INVALID_SCHEMA_VERSION) {
pSql->res.code = rpcMsg->code;
tscWarn("0x%" PRIx64 " it shall renew table meta, code:%s, retry:%d", pSql->self, tstrerror(rpcMsg->code), pSql->retry);
++pSql->retry;
if (pSql->retry > pSql->maxRetry) {
tscError("0x%" PRIx64 " max retry %d reached, give up", pSql->self, pSql->maxRetry);
} else {
pSql->retryReason = rpcMsg->code;
rpcMsg->code = tscRenewTableMeta(pSql);
}
}
}
bool shouldFree = tscShouldBeFreed(pSql);
......@@ -976,6 +965,8 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
pQueryMsg->schemaVersion = htons(pTableMeta->sversion);
pQueryMsg->tagVersion = htons(pTableMeta->tversion);
// set column list ids
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
char *pMsg = (char *)(pQueryMsg->tableCols) + numOfCols * sizeof(SColumnInfo);
......@@ -2705,26 +2696,6 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
tscResetForNextRetrieve(pRes);
int32_t sVersion = -1;
int32_t tVersion = -1;
if (pQueryAttr->extend == 1) {
STLV* tlv = (STLV*)(pRes->pRsp + sizeof(SQueryTableRsp));
while (tlv->type != TLV_TYPE_END_MARK) {
switch (ntohs(tlv->type)) {
case TLV_TYPE_META_VERSION:
sVersion = ntohl(*(int32_t*)tlv->value);
tVersion = ntohl(*(int32_t*)(tlv->value + sizeof(int32_t)));
break;
}
tlv = (STLV*) ((char*)tlv + sizeof(STLV) + ntohl(tlv->len));
}
}
STableMetaInfo* tableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
if (tableMetaInfo->pTableMeta->sversion < sVersion ||
tableMetaInfo->pTableMeta->tversion < tVersion) {
return TSDB_CODE_TSC_INVALID_SCHEMA_VERSION;
}
tscDebug("0x%"PRIx64" query rsp received, qId:0x%"PRIx64, pSql->self, pRes->qId);
return 0;
......
......@@ -28,8 +28,7 @@ typedef void* qinfo_t;
* @param qinfo
* @return
*/
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId,
int32_t* schemaVersion, int32_t* tagVersion);
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId);
/**
......
......@@ -292,6 +292,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error")
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition")
#define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired"
......
......@@ -523,6 +523,8 @@ typedef struct {
int32_t udfNum; // number of udf function
int32_t udfContentOffset;
int32_t udfContentLen;
int16_t schemaVersion;
int16_t tagVersion;
SColumnInfo tableCols[];
} SQueryTableMsg;
......
......@@ -67,8 +67,7 @@ void freeParam(SQueryParam *param) {
tfree(param->prevResult);
}
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t qId,
int32_t* schemaVersion, int32_t* tagVersion) {
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qinfo_t* pQInfo, uint64_t qId) {
assert(pQueryMsg != NULL && tsdb != NULL);
int32_t code = TSDB_CODE_SUCCESS;
......@@ -163,6 +162,13 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
assert(0);
}
int16_t queryTagVersion = ntohs(pQueryMsg->tagVersion);
int16_t querySchemaVersion = ntohs(pQueryMsg->schemaVersion);
if (queryTagVersion < tableGroupInfo.tVersion || querySchemaVersion < tableGroupInfo.sVersion) {
code = TSDB_CODE_QRY_INVALID_SCHEMA_VERSION;
goto _over;
}
code = checkForQueryBuf(tableGroupInfo.numOfTables);
if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort
goto _over;
......@@ -187,8 +193,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
code = initQInfo(&pQueryMsg->tsBuf, tsdb, NULL, *pQInfo, &param, (char*)pQueryMsg, pQueryMsg->prevResultLen, NULL);
*schemaVersion = tableGroupInfo.sVersion;
*tagVersion = tableGroupInfo.tVersion;
_over:
if (param.pGroupbyExpr != NULL) {
taosArrayDestroy(&(param.pGroupbyExpr->columnInfo));
......
......@@ -239,30 +239,13 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
if (contLen != 0) {
qinfo_t pQInfo = NULL;
uint64_t qId = genQueryId();
int32_t schemaVersion = -1;
int32_t tagVersion = -1;
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId, &schemaVersion, &tagVersion);
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId);
size_t rspLen = sizeof(SQueryTableRsp);
rspLen += (sizeof(STLV) + sizeof(int32_t) + sizeof(int32_t));
rspLen += sizeof(STLV);
SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(rspLen);
pRsp->code = code;
pRsp->qId = 0;
pRsp->extend = 1;
STLV* tlv = (STLV*)((char*)(pRsp) + sizeof(SQueryTableRsp));
tlv->type = htons(TLV_TYPE_META_VERSION);
tlv->len = htonl(sizeof(int32_t) + sizeof(int32_t));
int32_t sVersion = htonl(schemaVersion);
int32_t tVersion = htonl(tagVersion);
memcpy(tlv->value, &sVersion, sizeof(int32_t));
memcpy(tlv->value + sizeof(int32_t), &tVersion, sizeof(int32_t));
STLV* tlvEnd = (STLV*)((char*)tlv + sizeof(STLV) + ntohl(tlv->len));
tlvEnd->type = htons(TLV_TYPE_END_MARK);
tlvEnd->len = 0;
pRet->len = rspLen;
pRet->rsp = pRsp;
int32_t vgId = pVnode->vgId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册