diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c641fbb1a3b2c4ff877179dc10a9e50f7df76d48..8a17d9d710d29513cadf4ecac470a72a14c4629e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -265,7 +265,8 @@ typedef struct { typedef struct { char name[TSDB_TABLE_FNAME_LEN]; int8_t alterType; - SSchema schema; + int32_t numOfColumns; + SSchema pSchema[]; } SMAlterStbReq; typedef struct { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index e8a9a68466919c0bd77857467e9f859635edb67f..c9228ae78552b3c8ebd663629fe8ee981bf4c1e8 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -301,10 +301,12 @@ typedef struct { uint64_t uid; uint64_t dbUid; int32_t version; + int32_t nextColId; int32_t numOfColumns; int32_t numOfTags; + SSchema* pTags; + SSchema* pColumns; SRWLatch lock; - SSchema* pSchema; } SStbObj; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 4ccd4b63c44c013150ac99d86acd72cb47824d80..7e025602caa01da422f53bd212088fc13b4f4bdd 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -84,12 +84,20 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb) { SDB_SET_INT64(pRaw, dataPos, pStb->uid, STB_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->version, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER) - int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; - for (int32_t i = 0; i < totalCols; ++i) { - SSchema *pSchema = &pStb->pSchema[i]; + for (int32_t i = 0; i < pStb->numOfColumns; ++i) { + SSchema *pSchema = &pStb->pColumns[i]; + SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER) + } + + for (int32_t i = 0; i < pStb->numOfTags; ++i) { + SSchema *pSchema = &pStb->pTags[i]; SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER) @@ -137,17 +145,26 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pStb->uid, STB_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->version, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER) - int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; - pStb->pSchema = calloc(totalCols, sizeof(SSchema)); - if (pStb->pSchema == NULL) { + pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema)); + pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema)); + if (pStb->pColumns == NULL || pStb->pTags == NULL) { goto STB_DECODE_OVER; } - for (int32_t i = 0; i < totalCols; ++i) { - SSchema *pSchema = &pStb->pSchema[i]; + for (int32_t i = 0; i < pStb->numOfColumns; ++i) { + SSchema *pSchema = &pStb->pColumns[i]; + SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER) + SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER) + SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER) + } + + for (int32_t i = 0; i < pStb->numOfTags; ++i) { + SSchema *pSchema = &pStb->pTags[i]; SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER) @@ -183,13 +200,24 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew); taosWLockLatch(&pOld->lock); - int32_t totalCols = pNew->numOfTags + pNew->numOfColumns; - int32_t totalSize = totalCols * sizeof(SSchema); - if (pOld->numOfTags + pOld->numOfColumns < totalCols) { - void *pSchema = malloc(totalSize); + + if (pOld->numOfColumns < pNew->numOfColumns) { + void *pSchema = malloc(pOld->numOfColumns * sizeof(SSchema)); if (pSchema != NULL) { - free(pOld->pSchema); - pOld->pSchema = pSchema; + free(pOld->pColumns); + pOld->pColumns = pSchema; + } else { + terrno = TSDB_CODE_OUT_OF_MEMORY; + mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr()); + taosWUnLockLatch(&pOld->lock); + } + } + + if (pOld->numOfTags < pNew->numOfTags) { + void *pSchema = malloc(pOld->numOfTags * sizeof(SSchema)); + if (pSchema != NULL) { + free(pOld->pTags); + pOld->pTags = pSchema; } else { terrno = TSDB_CODE_OUT_OF_MEMORY; mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr()); @@ -199,9 +227,11 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) { pOld->updateTime = pNew->updateTime; pOld->version = pNew->version; + pOld->nextColId = pNew->nextColId; pOld->numOfColumns = pNew->numOfColumns; pOld->numOfTags = pNew->numOfTags; - memcpy(pOld->pSchema, pNew->pSchema, totalSize); + memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema)); + memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema)); taosWUnLockLatch(&pOld->lock); return 0; } @@ -242,9 +272,9 @@ static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb req.type = TD_SUPER_TABLE; req.stbCfg.suid = pStb->uid; req.stbCfg.nCols = pStb->numOfColumns; - req.stbCfg.pSchema = pStb->pSchema; + req.stbCfg.pSchema = pStb->pColumns; req.stbCfg.nTagCols = pStb->numOfTags; - req.stbCfg.pTagSchema = pStb->pSchema + pStb->numOfColumns; + req.stbCfg.pTagSchema = pStb->pTags; int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead); SMsgHead *pHead = malloc(contLen); @@ -442,20 +472,32 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); stbObj.dbUid = pDb->uid; stbObj.version = 1; + stbObj.nextColId = 1; stbObj.numOfColumns = pCreate->numOfColumns; stbObj.numOfTags = pCreate->numOfTags; - int32_t totalCols = stbObj.numOfColumns + stbObj.numOfTags; - int32_t totalSize = totalCols * sizeof(SSchema); - stbObj.pSchema = malloc(totalSize); - if (stbObj.pSchema == NULL) { + stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema)); + if (stbObj.pColumns == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - memcpy(stbObj.pSchema, pCreate->pSchema, totalSize); + memcpy(stbObj.pColumns, pCreate->pSchema, stbObj.numOfColumns * sizeof(SSchema)); - for (int32_t i = 0; i < totalCols; ++i) { - stbObj.pSchema[i].colId = i + 1; + stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema)); + if (stbObj.pTags == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(stbObj.pTags, pCreate->pSchema + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema)); + + for (int32_t i = 0; i < stbObj.numOfColumns; ++i) { + stbObj.pColumns[i].colId = stbObj.nextColId; + stbObj.nextColId++; + } + + for (int32_t i = 0; i < stbObj.numOfTags; ++i) { + stbObj.pTags[i].colId = stbObj.nextColId; + stbObj.nextColId++; } int32_t code = -1; @@ -538,25 +580,29 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) { } static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) { - SSchema *pSchema = &pAlter->schema; - pSchema->colId = htonl(pSchema->colId); - pSchema->bytes = htonl(pSchema->bytes); + pAlter->numOfColumns = htonl(pAlter->numOfColumns); - if (pSchema->type <= 0) { - terrno = TSDB_CODE_MND_INVALID_STB_OPTION; - return -1; - } - if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) { - terrno = TSDB_CODE_MND_INVALID_STB_OPTION; - return -1; - } - if (pSchema->bytes <= 0) { - terrno = TSDB_CODE_MND_INVALID_STB_OPTION; - return -1; - } - if (pSchema->name[0] == 0) { - terrno = TSDB_CODE_MND_INVALID_STB_OPTION; - return -1; + for (int32_t i = 0; i < pAlter->numOfColumns; ++i) { + SSchema *pSchema = &pAlter->pSchema[i]; + pSchema->colId = htonl(pSchema->colId); + pSchema->bytes = htonl(pSchema->bytes); + + if (pSchema->type <= 0) { + terrno = TSDB_CODE_MND_INVALID_STB_OPTION; + return -1; + } + if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) { + terrno = TSDB_CODE_MND_INVALID_STB_OPTION; + return -1; + } + if (pSchema->bytes <= 0) { + terrno = TSDB_CODE_MND_INVALID_STB_OPTION; + return -1; + } + if (pSchema->name[0] == 0) { + terrno = TSDB_CODE_MND_INVALID_STB_OPTION; + return -1; + } } return 0; @@ -769,14 +815,24 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { pMeta->suid = htobe64(pStb->uid); pMeta->tuid = htobe64(pStb->uid); - for (int32_t i = 0; i < totalCols; ++i) { + for (int32_t i = 0; i < pStb->numOfColumns; ++i) { SSchema *pSchema = &pMeta->pSchema[i]; - SSchema *pSrcSchema = &pStb->pSchema[i]; + SSchema *pSrcSchema = &pStb->pColumns[i]; + memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); + pSchema->type = pSrcSchema->type; + pSchema->colId = htonl(pSrcSchema->colId); + pSchema->bytes = htonl(pSrcSchema->bytes); + } + + for (int32_t i = 0; i < pStb->numOfTags; ++i) { + SSchema *pSchema = &pMeta->pSchema[i + pStb->numOfColumns]; + SSchema *pSrcSchema = &pStb->pTags[i]; memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); pSchema->type = pSrcSchema->type; pSchema->colId = htonl(pSrcSchema->colId); pSchema->bytes = htonl(pSrcSchema->bytes); } + taosRUnLockLatch(&pStb->lock); mndReleaseDb(pMnode, pDb); mndReleaseStb(pMnode, pStb); @@ -789,11 +845,11 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { } int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen) { - SSdb *pSdb = pMnode->pSdb; - int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema)); - void *buf = malloc(bufSize); - int32_t len = 0; - int32_t contLen = 0; + SSdb *pSdb = pMnode->pSdb; + int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema)); + void *buf = malloc(bufSize); + int32_t len = 0; + int32_t contLen = 0; STableMetaRsp *pRsp = NULL; for (int32_t i = 0; i < num; ++i) { @@ -803,7 +859,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num stb->tversion = ntohs(stb->tversion); if ((contLen + sizeof(STableMetaRsp)) > bufSize) { - bufSize = contLen + (num -i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema)); + bufSize = contLen + (num - i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema)); buf = realloc(buf, bufSize); } @@ -812,9 +868,9 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num strcpy(pRsp->dbFName, stb->dbFName); strcpy(pRsp->tbName, stb->stbName); strcpy(pRsp->stbName, stb->stbName); - + mDebug("start to retrieve meta, db:%s, stb:%s", stb->dbFName, stb->stbName); - + SDbObj *pDb = mndAcquireDb(pMnode, stb->dbFName); if (pDb == NULL) { pRsp->numOfColumns = -1; @@ -826,7 +882,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; snprintf(tbFName, sizeof(tbFName), "%s.%s", stb->dbFName, stb->stbName); - + SStbObj *pStb = mndAcquireStb(pMnode, tbFName); if (pStb == NULL) { mndReleaseDb(pMnode, pDb); @@ -836,7 +892,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num mWarn("stb:%s, failed to get meta since %s", tbFName, terrstr()); continue; } - + taosRLockLatch(&pStb->lock); if (stb->suid == pStb->uid && stb->sversion == pStb->version) { @@ -845,17 +901,17 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num mndReleaseStb(pMnode, pStb); continue; } - + int32_t totalCols = pStb->numOfColumns + pStb->numOfTags; int32_t len = totalCols * sizeof(SSchema); - + contLen += sizeof(STableMetaRsp) + len; - + if (contLen > bufSize) { - bufSize = contLen + (num -i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema)); + bufSize = contLen + (num - i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema)); buf = realloc(buf, bufSize); } - + pRsp->numOfTags = htonl(pStb->numOfTags); pRsp->numOfColumns = htonl(pStb->numOfColumns); pRsp->precision = pDb->cfg.precision; @@ -864,15 +920,25 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num pRsp->sversion = htonl(pStb->version); pRsp->suid = htobe64(pStb->uid); pRsp->tuid = htobe64(pStb->uid); - - for (int32_t i = 0; i < totalCols; ++i) { + + for (int32_t i = 0; i < pStb->numOfColumns; ++i) { SSchema *pSchema = &pRsp->pSchema[i]; - SSchema *pSrcSchema = &pStb->pSchema[i]; + SSchema *pSrcSchema = &pStb->pColumns[i]; + memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); + pSchema->type = pSrcSchema->type; + pSchema->colId = htonl(pSrcSchema->colId); + pSchema->bytes = htonl(pSrcSchema->bytes); + } + + for (int32_t i = 0; i < pStb->numOfTags; ++i) { + SSchema *pSchema = &pRsp->pSchema[i + pStb->numOfColumns]; + SSchema *pSrcSchema = &pStb->pTags[i]; memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); pSchema->type = pSrcSchema->type; pSchema->colId = htonl(pSrcSchema->colId); pSchema->bytes = htonl(pSrcSchema->bytes); } + taosRUnLockLatch(&pStb->lock); mndReleaseDb(pMnode, pDb); mndReleaseStb(pMnode, pStb); @@ -890,7 +956,6 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num return 0; } - static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) { SSdb *pSdb = pMnode->pSdb;