未验证 提交 67e9d380 编写于 作者: S shenglian-zhou 提交者: GitHub

Merge pull request #9722 from taosdata/szhou/feature/meta-sver-tver

TD-11389: renew table meta when query rsp include higher version
......@@ -207,7 +207,7 @@ TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index);
void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList);
......
......@@ -418,7 +418,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
// single table query error need to be handled here.
if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) &&
(((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) {
(rpcMsg->code == TSDB_CODE_QRY_INVALID_SCHEMA_VERSION) ||
rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY )) {
// 1. super table subquery
// 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer
......@@ -511,6 +512,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
}
bool shouldFree = tscShouldBeFreed(pSql);
if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
if (rpcMsg->code != TSDB_CODE_SUCCESS) {
pRes->code = rpcMsg->code;
......@@ -962,7 +964,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols);
pQueryMsg->queryType = htonl(pQueryInfo->type);
pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen);
// set column list ids
size_t numOfCols = taosArrayGetSize(pQueryInfo->colList);
char *pMsg = (char *)(pQueryMsg->tableCols) + numOfCols * sizeof(SColumnInfo);
......@@ -1148,21 +1150,21 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
pMsg += sqlLen;
/*
//MSG EXTEND DEMO
pQueryMsg->extend = 1;
STLV *tlv = (STLV *)pMsg;
tlv->type = htons(TLV_TYPE_DUMMY);
tlv->len = htonl(sizeof(int16_t));
*(int16_t *)tlv->value = htons(12345);
tlv->type = htons(TLV_TYPE_META_VERSION);
tlv->len = htonl(sizeof(int16_t) * 2);
*(int16_t*)tlv->value = htons(pTableMeta->sversion);
*(int16_t*)(tlv->value+sizeof(int16_t)) = htons(pTableMeta->tversion);
pMsg += sizeof(*tlv) + ntohl(tlv->len);
tlv = (STLV *)pMsg;
tlv->type = htons(TLV_TYPE_END_MARK);
tlv->len = 0;
pMsg += sizeof(*tlv);
*/
int32_t msgLen = (int32_t)(pMsg - pCmd->payload);
......@@ -2691,7 +2693,9 @@ int tscProcessQueryRsp(SSqlObj *pSql) {
pRes->data = NULL;
tscResetForNextRetrieve(pRes);
tscDebug("0x%"PRIx64" query rsp received, qId:0x%"PRIx64, pSql->self, pRes->qId);
return 0;
}
......@@ -2703,7 +2707,7 @@ static void decompressQueryColData(SSqlObj *pSql, SSqlRes *pRes, SQueryInfo* pQu
compSizes = (int32_t *)(pData + compLen);
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1);
int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
int32_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1);
char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset));
char *p = outputBuf;
......@@ -2804,11 +2808,12 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
pRes->row = 0;
tscDebug("0x%"PRIx64" numOfRows:%d, offset:%" PRId64 ", complete:%d, qId:0x%"PRIx64, pSql->self, pRes->numOfRows, pRes->offset,
pRes->completed, pRes->qId);
pRes->completed, pRes->qId);
return 0;
}
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) {
......
......@@ -2369,7 +2369,7 @@ TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) {
return &((SInternalField*)TARRAY_GET_ELEM(pFieldInfo->internalField, index))->field;
}
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) {
int32_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) {
SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, index);
assert(pInfo != NULL && pInfo->pExpr->pExpr == NULL);
......
......@@ -115,6 +115,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x0225) //"Invalid line protocol type")
#define TSDB_CODE_TSC_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x0226) //"Invalid timestamp precision type")
#define TSDB_CODE_TSC_RES_TOO_MANY TAOS_DEF_ERROR_CODE(0, 0x0227) //"Result set too large to be output")
#define TSDB_CODE_TSC_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0228) //"invalid table schema version")
// mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed"
......@@ -291,6 +292,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error")
#define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition")
#define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version")
// grant
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired"
......
......@@ -978,7 +978,9 @@ typedef struct {
} STLV;
enum {
TLV_TYPE_DUMMY = 1,
TLV_TYPE_END_MARK = -1,
//TLV_TYPE_DUMMY = 1,
TLV_TYPE_META_VERSION = 1,
};
#pragma pack(pop)
......
......@@ -229,6 +229,8 @@ typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
int32_t sVersion;
int32_t tVersion;
} STableGroupInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
......
......@@ -428,6 +428,8 @@ typedef struct SQueryParam {
int32_t tableScanOperator;
SArray *pOperator;
SUdfInfo *pUdfInfo;
int16_t schemaVersion;
int16_t tagVersion;
} SQueryParam;
typedef struct SColumnDataParam{
......
......@@ -8270,10 +8270,6 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
goto _cleanup;
}
/*
//MSG EXTEND DEMO
if (pQueryMsg->extend) {
pMsg += pQueryMsg->sqlstrLen;
......@@ -8282,19 +8278,24 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) {
tlv = (STLV *)pMsg;
tlv->type = ntohs(tlv->type);
tlv->len = ntohl(tlv->len);
if (tlv->len > 0) {
*(int16_t *)tlv->value = ntohs(*(int16_t *)tlv->value);
qDebug("Got TLV,type:%d,len:%d,value:%d", tlv->type, tlv->len, *(int16_t*)tlv->value);
pMsg += sizeof(*tlv) + tlv->len;
continue;
if (tlv->type == TLV_TYPE_END_MARK) {
break;
}
switch(tlv->type) {
case TLV_TYPE_META_VERSION: {
assert(tlv->len == 2*sizeof(int16_t));
param->schemaVersion = ntohs(*(int16_t*)tlv->value);
param->tagVersion = ntohs(*(int16_t*)(tlv->value + sizeof(int16_t)));
pMsg += sizeof(*tlv) + tlv->len;
break;
}
default: {
pMsg += sizeof(*tlv) + tlv->len;
break;
}
}
break;
}
}
*/
qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, "
"outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64,
......
......@@ -115,6 +115,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
bool isSTableQuery = false;
STableGroupInfo tableGroupInfo = {0};
tableGroupInfo.sVersion = -1;
tableGroupInfo.tVersion = -1;
int64_t st = taosGetTimestampUs();
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) {
......@@ -160,6 +162,16 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
assert(0);
}
int16_t queryTagVersion = param.tagVersion;
int16_t querySchemaVersion = param.schemaVersion;
if (queryTagVersion < tableGroupInfo.tVersion || querySchemaVersion < tableGroupInfo.sVersion) {
qInfo("qmsg:%p invalid schema version. client meta sversion/tversion %d/%d, table sversion/tversion %d/%d", pQueryMsg,
querySchemaVersion, queryTagVersion, tableGroupInfo.sVersion, tableGroupInfo.tVersion);
tsdbDestroyTableGroup(&tableGroupInfo);
code = TSDB_CODE_QRY_INVALID_SCHEMA_VERSION;
goto _over;
}
code = checkForQueryBuf(tableGroupInfo.numOfTables);
if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort
goto _over;
......@@ -425,7 +437,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co
*contLen = *contLen - origSize + compSize;
*pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen);
qDebug("QInfo:0x%"PRIx64" compress col data, uncompressed size:%d, compressed size:%d, ratio:%.2f",
pQInfo->qId, origSize, compSize, (float)origSize / (float)compSize);
pQInfo->qId, origSize, compSize, (float)origSize / (float)compSize);
}
(*pRsp)->compLen = htonl(compLen);
......
......@@ -3970,7 +3970,7 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, cons
//NOTE: not add ref count for super table
res = taosArrayInit(8, sizeof(STableKeyInfo));
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
assert(pTagSchema != NULL);
// no tags and tbname condition, all child tables of this stable are involved
if (pTagCond == NULL || len == 0) {
int32_t ret = getAllTableList(pTable, res);
......@@ -3981,7 +3981,8 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, cons
pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
pGroupInfo->sVersion = tsdbGetTableSchema(pTable)->version;
pGroupInfo->tVersion = pTagSchema->version;
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu", tsdb,
pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
......@@ -4068,6 +4069,11 @@ int32_t tsdbGetOneTableGroup(STsdbRepo* tsdb, uint64_t uid, TSKEY startKey, STab
taosArrayPush(group, &info);
taosArrayPush(pGroupInfo->pGroupList, &group);
pGroupInfo->sVersion = tsdbGetTableSchema(pTable)->version;
if (tsdbGetTableTagSchema(pTable) != NULL) {
pGroupInfo->tVersion = tsdbGetTableTagSchema(pTable)->version;
}
return TSDB_CODE_SUCCESS;
_error:
......@@ -4084,6 +4090,8 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
int32_t sVersion = -1;
int32_t tVersion = -1;
for(int32_t i = 0; i < size; ++i) {
STableIdInfo *id = taosArrayGet(pTableIdList, i);
......@@ -4105,6 +4113,19 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl
STableKeyInfo info = {.pTable = pTable, .lastKey = id->key};
taosArrayPush(group, &info);
if (sVersion == -1) {
sVersion = tsdbGetTableSchema(pTable)->version;
} else {
assert (sVersion == tsdbGetTableSchema(pTable)->version);
}
assert(tsdbGetTableTagSchema(pTable) != NULL);
if (tVersion == -1) {
tVersion = tsdbGetTableTagSchema(pTable)->version;
} else {
assert (tVersion == tsdbGetTableTagSchema(pTable)->version);
}
}
if (tsdbUnlockRepoMeta(tsdb) < 0) {
......@@ -4119,6 +4140,9 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl
taosArrayDestroy(&group);
}
pGroupInfo->sVersion = sVersion;
pGroupInfo->tVersion = tVersion;
return TSDB_CODE_SUCCESS;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册