diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f7562a4b9b30aa3314fe19c789da15b4681322b8..38609c23483ece8ce97e4065d42952d6f58d0ab8 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -301,6 +301,8 @@ typedef struct SSchema { typedef struct { int32_t nCols; int32_t sver; + int32_t tagVer; + int32_t colVer; SSchema* pSchema; } SSchemaWrapper; @@ -309,6 +311,8 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p if (pSW == NULL) return pSW; pSW->nCols = pSchemaWrapper->nCols; pSW->sver = pSchemaWrapper->sver; + pSW->tagVer = pSchemaWrapper->tagVer; + pSW->colVer = pSchemaWrapper->colVer; pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { taosMemoryFree(pSW); @@ -364,6 +368,8 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr int32_t tlen = 0; tlen += taosEncodeVariantI32(buf, pSW->nCols); tlen += taosEncodeVariantI32(buf, pSW->sver); + tlen += taosEncodeVariantI32(buf, pSW->tagVer); + tlen += taosEncodeVariantI32(buf, pSW->colVer); for (int32_t i = 0; i < pSW->nCols; i++) { tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]); } @@ -373,6 +379,8 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) { buf = taosDecodeVariantI32(buf, &pSW->nCols); buf = taosDecodeVariantI32(buf, &pSW->sver); + buf = taosDecodeVariantI32(buf, &pSW->tagVer); + buf = taosDecodeVariantI32(buf, &pSW->colVer); pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) { return NULL; @@ -387,6 +395,8 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) { if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1; if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1; + if (tEncodeI32v(pEncoder, pSW->tagVer) < 0) return -1; + if (tEncodeI32v(pEncoder, pSW->colVer) < 0) return -1; for (int32_t i = 0; i < pSW->nCols; i++) { if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1; } @@ -397,6 +407,8 @@ static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSch static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) { if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1; pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema)); if (pSW->pSchema == NULL) return -1; @@ -410,6 +422,8 @@ static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWra static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) { if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1; if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->tagVer) < 0) return -1; + if (tDecodeI32v(pDecoder, &pSW->colVer) < 0) return -1; pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema)); if (pSW->pSchema == NULL) return -1; @@ -455,6 +469,7 @@ int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq); typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; + int32_t verInBlock; int32_t numOfFields; SArray* pFields; int32_t ttl; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 278fc2c193830dfa199f07c4598c80ce8576b4ec..677fbb43b4d88f195460678cf5b3a99f7358ea5d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -600,6 +600,7 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) if (tStartEncode(&encoder) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeI8(&encoder, pReq->alterType) < 0) return -1; + if (tEncodeI32(&encoder, pReq->verInBlock) < 0) return -1; if (tEncodeI32(&encoder, pReq->numOfFields) < 0) return -1; for (int32_t i = 0; i < pReq->numOfFields; ++i) { SField *pField = taosArrayGet(pReq->pFields, i); @@ -626,6 +627,7 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq if (tStartDecode(&decoder) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; if (tDecodeI8(&decoder, &pReq->alterType) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->verInBlock) < 0) return -1; if (tDecodeI32(&decoder, &pReq->numOfFields) < 0) return -1; pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField)); if (pReq->pFields == NULL) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a04417736a32d01619a011b17eab23291460ab82..81f4c5ed1ef87431b639d256acde0faa596692fe 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -365,6 +365,8 @@ typedef struct { int64_t uid; int64_t dbUid; int32_t version; + int32_t tagVer; + int32_t colVer; int32_t nextColId; float xFilesFactor; int32_t delay; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index e8a0b31e2f902f4ba878cf3384b5ab47d9801b33..7485510bc6f5db1873dced18e911adfe493a5d24 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -88,6 +88,8 @@ SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_INT64(pRaw, dataPos, pStb->uid, _OVER) SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->version, _OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->tagVer, _OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->colVer, _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, _OVER) SDB_SET_INT32(pRaw, dataPos, (int32_t)(pStb->xFilesFactor * 10000), _OVER) SDB_SET_INT32(pRaw, dataPos, pStb->delay, _OVER) @@ -166,6 +168,8 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pStb->uid, _OVER) SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->version, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->tagVer, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->colVer, _OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, _OVER) int32_t xFilesFactor = 0; SDB_GET_INT32(pRaw, dataPos, &xFilesFactor, _OVER) @@ -317,6 +321,8 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { pOld->updateTime = pNew->updateTime; pOld->version = pNew->version; + pOld->tagVer = pNew->tagVer; + pOld->colVer = pNew->colVer; pOld->nextColId = pNew->nextColId; pOld->ttl = pNew->ttl; pOld->numOfColumns = pNew->numOfColumns; @@ -384,6 +390,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt req.rollup = pStb->ast1Len > 0 ? 1 : 0; req.schema.nCols = pStb->numOfColumns; req.schema.sver = pStb->version; + req.schema.tagVer = pStb->tagVer; + req.schema.colVer = pStb->colVer; req.schema.pSchema = pStb->pColumns; req.schemaTag.nCols = pStb->numOfTags; req.schemaTag.sver = 1; @@ -657,6 +665,8 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); pDst->dbUid = pDb->uid; pDst->version = 1; + pDst->tagVer = 1; + pDst->colVer = 1; pDst->nextColId = 1; pDst->xFilesFactor = pCreate->xFilesFactor; pDst->delay = pCreate->delay; @@ -949,6 +959,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p } pNew->version++; + pNew->tagVer++; return 0; } @@ -967,6 +978,7 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch pNew->numOfTags--; pNew->version++; + pNew->tagVer++; mDebug("stb:%s, start to drop tag %s", pNew->name, tagName); return 0; } @@ -1007,6 +1019,7 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pF memcpy(pSchema->name, newTagName, TSDB_COL_NAME_LEN); pNew->version++; + pNew->tagVer++; mDebug("stb:%s, start to modify tag %s to %s", pNew->name, oldTagName, newTagName); return 0; } @@ -1036,6 +1049,7 @@ static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SFi pTag->bytes = pField->bytes; pNew->version++; + pNew->tagVer++; mDebug("stb:%s, start to modify tag len %s to %d", pNew->name, pField->name, pField->bytes); return 0; @@ -1075,6 +1089,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray } pNew->version++; + pNew->colVer++; return 0; } @@ -1103,6 +1118,7 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const pNew->numOfColumns--; pNew->version++; + pNew->colVer++; mDebug("stb:%s, start to drop col %s", pNew->name, colName); return 0; } @@ -1141,6 +1157,7 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const pCol->bytes = pField->bytes; pNew->version++; + pNew->colVer++; mDebug("stb:%s, start to modify col len %s to %d", pNew->name, pField->name, pField->bytes); return 0; @@ -1300,6 +1317,13 @@ static int32_t mndProcessMAlterStbReq(SRpcMsg *pReq) { goto _OVER; } + if (alterReq.verInBlock > 0 && alterReq.verInBlock <= pStb->version) { + mDebug("stb:%s, already exist, verInBlock:%d smaller than verInStb:%d, alter success", alterReq.name, + alterReq.verInBlock, pStb->version); + code = 0; + goto _OVER; + } + pUser = mndAcquireUser(pMnode, pReq->conn.user); if (pUser == NULL) { goto _OVER; diff --git a/source/dnode/mnode/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp index 16974ad54158e29464d29af9f3deede38ca3b1a8..b8873210ab995bde06f6c2baf17d14975bc591e1 100644 --- a/source/dnode/mnode/impl/test/stb/stb.cpp +++ b/source/dnode/mnode/impl/test/stb/stb.cpp @@ -32,7 +32,7 @@ class MndTestStb : public ::testing::Test { void* BuildAlterStbUpdateTagBytesReq(const char* stbname, const char* tagname, int32_t bytes, int32_t* pContLen); void* BuildAlterStbAddColumnReq(const char* stbname, const char* colname, int32_t* pContLen); void* BuildAlterStbDropColumnReq(const char* stbname, const char* colname, int32_t* pContLen); - void* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, int32_t* pContLen); + void* BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, int32_t* pContLen, int32_t verInBlock); }; Testbase MndTestStb::test; @@ -271,12 +271,13 @@ void* MndTestStb::BuildAlterStbDropColumnReq(const char* stbname, const char* co } void* MndTestStb::BuildAlterStbUpdateColumnBytesReq(const char* stbname, const char* colname, int32_t bytes, - int32_t* pContLen) { + int32_t* pContLen, int32_t verInBlock) { SMAlterStbReq req = {0}; strcpy(req.name, stbname); req.numOfFields = 1; req.pFields = taosArrayInit(1, sizeof(SField)); req.alterType = TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES; + req.verInBlock = verInBlock; SField field = {0}; field.bytes = bytes; @@ -781,31 +782,40 @@ TEST_F(MndTestStb, 08_Alter_Stb_AlterTagBytes) { } { - void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col5", 12, &contLen); + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col5", 12, &contLen, 0); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_COLUMN_NOT_EXIST); } { - void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "ts", 8, &contLen); + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "ts", 8, &contLen, 0); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_STB_OPTION); } { - void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 8, &contLen); + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 8, &contLen, 0); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_ROW_BYTES); } { - void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", TSDB_MAX_BYTES_PER_ROW, &contLen); + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", TSDB_MAX_BYTES_PER_ROW, &contLen, 0); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); ASSERT_EQ(pRsp->code, TSDB_CODE_MND_INVALID_ROW_BYTES); } { - void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 20, &contLen); + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col1", 20, &contLen, 0); + SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); + ASSERT_EQ(pRsp->code, 0); + + test.SendShowReq(TSDB_MGMT_TABLE_STB, "user_stables", dbname); + EXPECT_EQ(test.GetShowRows(), 1); + } + + { + void* pReq = BuildAlterStbUpdateColumnBytesReq(stbname, "col_not_exist", 20, &contLen, 1); SRpcMsg* pRsp = test.SendReq(TDMT_MND_ALTER_STB, pReq, contLen); ASSERT_EQ(pRsp->code, 0);