diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index b583ee0b01a7b5c38c310d3c08d2f06e7d40e663..ca6b1dd206f7711e6fe268b8a2448d1daa38287c 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -207,7 +207,7 @@ TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index); void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo); -int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); +int32_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index); void tscFieldInfoClear(SFieldInfo* pFieldInfo); void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 7f8e9066af2ea05dd2cc50d8a9e156a5b44cb6cc..78b038ab19815a7e00f9e30bc0a98e8b792af7b7 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -418,7 +418,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { // single table query error need to be handled here. if ((cmd == TSDB_SQL_SELECT || cmd == TSDB_SQL_UPDATE_TAGS_VAL) && (((rpcMsg->code == TSDB_CODE_TDB_INVALID_TABLE_ID || rpcMsg->code == TSDB_CODE_VND_INVALID_VGROUP_ID)) || - rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY)) { + (rpcMsg->code == TSDB_CODE_QRY_INVALID_SCHEMA_VERSION) || + rpcMsg->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || rpcMsg->code == TSDB_CODE_APP_NOT_READY )) { // 1. super table subquery // 2. nest queries are all not updated the tablemeta and retry parse the sql after cleanup local tablemeta/vgroup id buffer @@ -511,6 +512,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) { } bool shouldFree = tscShouldBeFreed(pSql); + if (rpcMsg->code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (rpcMsg->code != TSDB_CODE_SUCCESS) { pRes->code = rpcMsg->code; @@ -962,7 +964,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pQueryMsg->numOfGroupCols = htons(pQueryInfo->groupbyExpr.numOfGroupCols); pQueryMsg->queryType = htonl(pQueryInfo->type); pQueryMsg->prevResultLen = htonl(pQueryInfo->bufLen); - + // set column list ids size_t numOfCols = taosArrayGetSize(pQueryInfo->colList); char *pMsg = (char *)(pQueryMsg->tableCols) + numOfCols * sizeof(SColumnInfo); @@ -1148,21 +1150,21 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) { pMsg += sqlLen; -/* - //MSG EXTEND DEMO + pQueryMsg->extend = 1; STLV *tlv = (STLV *)pMsg; - tlv->type = htons(TLV_TYPE_DUMMY); - tlv->len = htonl(sizeof(int16_t)); - *(int16_t *)tlv->value = htons(12345); + tlv->type = htons(TLV_TYPE_META_VERSION); + tlv->len = htonl(sizeof(int16_t) * 2); + *(int16_t*)tlv->value = htons(pTableMeta->sversion); + *(int16_t*)(tlv->value+sizeof(int16_t)) = htons(pTableMeta->tversion); pMsg += sizeof(*tlv) + ntohl(tlv->len); tlv = (STLV *)pMsg; + tlv->type = htons(TLV_TYPE_END_MARK); tlv->len = 0; pMsg += sizeof(*tlv); -*/ int32_t msgLen = (int32_t)(pMsg - pCmd->payload); @@ -2691,7 +2693,9 @@ int tscProcessQueryRsp(SSqlObj *pSql) { pRes->data = NULL; tscResetForNextRetrieve(pRes); + tscDebug("0x%"PRIx64" query rsp received, qId:0x%"PRIx64, pSql->self, pRes->qId); + return 0; } @@ -2703,7 +2707,7 @@ static void decompressQueryColData(SSqlObj *pSql, SSqlRes *pRes, SQueryInfo* pQu compSizes = (int32_t *)(pData + compLen); TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, numOfCols - 1); - int16_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); + int32_t offset = tscFieldInfoGetOffset(pQueryInfo, numOfCols - 1); char *outputBuf = tcalloc(pRes->numOfRows, (pField->bytes + offset)); char *p = outputBuf; @@ -2804,11 +2808,12 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) { pRes->row = 0; tscDebug("0x%"PRIx64" numOfRows:%d, offset:%" PRId64 ", complete:%d, qId:0x%"PRIx64, pSql->self, pRes->numOfRows, pRes->offset, - pRes->completed, pRes->qId); + pRes->completed, pRes->qId); return 0; } + void tscTableMetaCallBack(void *param, TAOS_RES *res, int code); static int32_t getTableMetaFromMnode(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo, bool autocreate) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index f29c3c8eb6057f9bfeec54079112957eb504dbac..691eea9033345ed4cb674a391fcfee5bc4b5391c 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -2369,7 +2369,7 @@ TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) { return &((SInternalField*)TARRAY_GET_ELEM(pFieldInfo->internalField, index))->field; } -int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) { +int32_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) { SInternalField* pInfo = tscFieldInfoGetInternalField(&pQueryInfo->fieldsInfo, index); assert(pInfo != NULL && pInfo->pExpr->pExpr == NULL); diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index fb70badb862943a0259b2dc94bf52b0a452bd714..44192403972cd9dc54b3f2a965e1468595e17487 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -115,6 +115,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_INVALID_PROTOCOL_TYPE TAOS_DEF_ERROR_CODE(0, 0x0225) //"Invalid line protocol type") #define TSDB_CODE_TSC_INVALID_PRECISION_TYPE TAOS_DEF_ERROR_CODE(0, 0x0226) //"Invalid timestamp precision type") #define TSDB_CODE_TSC_RES_TOO_MANY TAOS_DEF_ERROR_CODE(0, 0x0227) //"Result set too large to be output") +#define TSDB_CODE_TSC_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0228) //"invalid table schema version") // mnode #define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed" @@ -291,6 +292,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica") #define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error") #define TSDB_CODE_QRY_INVALID_TIME_CONDITION TAOS_DEF_ERROR_CODE(0, 0x070E) //"invalid time condition") +#define TSDB_CODE_QRY_INVALID_SCHEMA_VERSION TAOS_DEF_ERROR_CODE(0, 0x0710) //"invalid schema version") // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired" diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 9dc76466aadbe9781dbdd727a524a32f8103650f..26ce551e397fccfe6eb378aa0de2de771dfae10f 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -978,7 +978,9 @@ typedef struct { } STLV; enum { - TLV_TYPE_DUMMY = 1, + TLV_TYPE_END_MARK = -1, + //TLV_TYPE_DUMMY = 1, + TLV_TYPE_META_VERSION = 1, }; #pragma pack(pop) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index d9c93e8cce4fd54e3417b7d335d05cfcba185c42..eeff90bd5399c1ff2e08b1254fc63c9e53d3cbc3 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -229,6 +229,8 @@ typedef struct { uint32_t numOfTables; SArray *pGroupList; SHashObj *map; // speedup acquire the tableQueryInfo by table uid + int32_t sVersion; + int32_t tVersion; } STableGroupInfo; #define TSDB_BLOCK_DIST_STEP_ROWS 16 diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h index f399cbc7e12bd7f54b34bb03e792f8b9023870ec..0b938078e39e8a61d3c2d871192717fdc4dc82e7 100644 --- a/src/query/inc/qExecutor.h +++ b/src/query/inc/qExecutor.h @@ -428,6 +428,8 @@ typedef struct SQueryParam { int32_t tableScanOperator; SArray *pOperator; SUdfInfo *pUdfInfo; + int16_t schemaVersion; + int16_t tagVersion; } SQueryParam; typedef struct SColumnDataParam{ diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index 4a9b977d0f002ec1ffe5c19819f0c9d73914629b..caa199f2ab2a463baaa8c222bec93fa826f8d145 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -8270,10 +8270,6 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { goto _cleanup; } - - -/* - //MSG EXTEND DEMO if (pQueryMsg->extend) { pMsg += pQueryMsg->sqlstrLen; @@ -8282,19 +8278,24 @@ int32_t convertQueryMsg(SQueryTableMsg *pQueryMsg, SQueryParam* param) { tlv = (STLV *)pMsg; tlv->type = ntohs(tlv->type); tlv->len = ntohl(tlv->len); - if (tlv->len > 0) { - *(int16_t *)tlv->value = ntohs(*(int16_t *)tlv->value); - qDebug("Got TLV,type:%d,len:%d,value:%d", tlv->type, tlv->len, *(int16_t*)tlv->value); - pMsg += sizeof(*tlv) + tlv->len; - continue; + if (tlv->type == TLV_TYPE_END_MARK) { + break; + } + switch(tlv->type) { + case TLV_TYPE_META_VERSION: { + assert(tlv->len == 2*sizeof(int16_t)); + param->schemaVersion = ntohs(*(int16_t*)tlv->value); + param->tagVersion = ntohs(*(int16_t*)(tlv->value + sizeof(int16_t))); + pMsg += sizeof(*tlv) + tlv->len; + break; + } + default: { + pMsg += sizeof(*tlv) + tlv->len; + break; + } } - - break; } } - -*/ - qDebug("qmsg:%p query %d tables, type:%d, qrange:%" PRId64 "-%" PRId64 ", numOfGroupbyTagCols:%d, order:%d, " "outputCols:%d, numOfCols:%d, interval:%" PRId64 ", fillType:%d, comptsLen:%d, compNumOfBlocks:%d, limit:%" PRId64 ", offset:%" PRId64, diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c index f7a895e2c08d8b9dd3e0d72c66f118b61b29bc47..a481f99cc8b4526a0f12dd73532ede8ccc8a53f8 100644 --- a/src/query/src/queryMain.c +++ b/src/query/src/queryMain.c @@ -115,6 +115,8 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi bool isSTableQuery = false; STableGroupInfo tableGroupInfo = {0}; + tableGroupInfo.sVersion = -1; + tableGroupInfo.tVersion = -1; int64_t st = taosGetTimestampUs(); if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) { @@ -160,6 +162,16 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi assert(0); } + int16_t queryTagVersion = param.tagVersion; + int16_t querySchemaVersion = param.schemaVersion; + if (queryTagVersion < tableGroupInfo.tVersion || querySchemaVersion < tableGroupInfo.sVersion) { + qInfo("qmsg:%p invalid schema version. client meta sversion/tversion %d/%d, table sversion/tversion %d/%d", pQueryMsg, + querySchemaVersion, queryTagVersion, tableGroupInfo.sVersion, tableGroupInfo.tVersion); + tsdbDestroyTableGroup(&tableGroupInfo); + code = TSDB_CODE_QRY_INVALID_SCHEMA_VERSION; + goto _over; + } + code = checkForQueryBuf(tableGroupInfo.numOfTables); if (code != TSDB_CODE_SUCCESS) { // not enough query buffer, abort goto _over; @@ -425,7 +437,7 @@ int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp **pRsp, int32_t *co *contLen = *contLen - origSize + compSize; *pRsp = (SRetrieveTableRsp *)rpcReallocCont(*pRsp, *contLen); qDebug("QInfo:0x%"PRIx64" compress col data, uncompressed size:%d, compressed size:%d, ratio:%.2f", - pQInfo->qId, origSize, compSize, (float)origSize / (float)compSize); + pQInfo->qId, origSize, compSize, (float)origSize / (float)compSize); } (*pRsp)->compLen = htonl(compLen); diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index cf4d190ff3511ed25cc49fc34296e2dcd3dfb3b4..1423b68ce59b2d7f22b40c884ad144d6496e69d8 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -3970,7 +3970,7 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, cons //NOTE: not add ref count for super table res = taosArrayInit(8, sizeof(STableKeyInfo)); STSchema* pTagSchema = tsdbGetTableTagSchema(pTable); - + assert(pTagSchema != NULL); // no tags and tbname condition, all child tables of this stable are involved if (pTagCond == NULL || len == 0) { int32_t ret = getAllTableList(pTable, res); @@ -3981,7 +3981,8 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, cons pGroupInfo->numOfTables = (uint32_t) taosArrayGetSize(res); pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey); - + pGroupInfo->sVersion = tsdbGetTableSchema(pTable)->version; + pGroupInfo->tVersion = pTagSchema->version; tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu", tsdb, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList)); @@ -4068,6 +4069,11 @@ int32_t tsdbGetOneTableGroup(STsdbRepo* tsdb, uint64_t uid, TSKEY startKey, STab taosArrayPush(group, &info); taosArrayPush(pGroupInfo->pGroupList, &group); + + pGroupInfo->sVersion = tsdbGetTableSchema(pTable)->version; + if (tsdbGetTableTagSchema(pTable) != NULL) { + pGroupInfo->tVersion = tsdbGetTableTagSchema(pTable)->version; + } return TSDB_CODE_SUCCESS; _error: @@ -4084,6 +4090,8 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES); SArray* group = taosArrayInit(1, sizeof(STableKeyInfo)); + int32_t sVersion = -1; + int32_t tVersion = -1; for(int32_t i = 0; i < size; ++i) { STableIdInfo *id = taosArrayGet(pTableIdList, i); @@ -4105,6 +4113,19 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl STableKeyInfo info = {.pTable = pTable, .lastKey = id->key}; taosArrayPush(group, &info); + + if (sVersion == -1) { + sVersion = tsdbGetTableSchema(pTable)->version; + } else { + assert (sVersion == tsdbGetTableSchema(pTable)->version); + } + + assert(tsdbGetTableTagSchema(pTable) != NULL); + if (tVersion == -1) { + tVersion = tsdbGetTableTagSchema(pTable)->version; + } else { + assert (tVersion == tsdbGetTableTagSchema(pTable)->version); + } } if (tsdbUnlockRepoMeta(tsdb) < 0) { @@ -4119,6 +4140,9 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl taosArrayDestroy(&group); } + pGroupInfo->sVersion = sVersion; + pGroupInfo->tVersion = tVersion; + return TSDB_CODE_SUCCESS; }