未验证 提交 a56e831c 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #10137 from taosdata/feature/mnode

update stb
......@@ -265,7 +265,8 @@ typedef struct {
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType;
SSchema schema;
int32_t numOfColumns;
SSchema pSchema[];
} SMAlterStbReq;
typedef struct {
......
......@@ -301,10 +301,12 @@ typedef struct {
uint64_t uid;
uint64_t dbUid;
int32_t version;
int32_t nextColId;
int32_t numOfColumns;
int32_t numOfTags;
SSchema* pTags;
SSchema* pColumns;
SRWLatch lock;
SSchema* pSchema;
} SStbObj;
typedef struct {
......
......@@ -84,12 +84,20 @@ static SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SDB_SET_INT64(pRaw, dataPos, pStb->uid, STB_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pStb->dbUid, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->version, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->nextColId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfColumns, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pStb->numOfTags, STB_ENCODE_OVER)
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pStb->pSchema[i];
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_ENCODE_OVER)
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pTags[i];
SDB_SET_INT8(pRaw, dataPos, pSchema->type, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->colId, STB_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pSchema->bytes, STB_ENCODE_OVER)
......@@ -137,17 +145,26 @@ static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pStb->uid, STB_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pStb->dbUid, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->version, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->nextColId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfColumns, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pStb->numOfTags, STB_DECODE_OVER)
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
pStb->pSchema = calloc(totalCols, sizeof(SSchema));
if (pStb->pSchema == NULL) {
pStb->pColumns = calloc(pStb->numOfColumns, sizeof(SSchema));
pStb->pTags = calloc(pStb->numOfTags, sizeof(SSchema));
if (pStb->pColumns == NULL || pStb->pTags == NULL) {
goto STB_DECODE_OVER;
}
for (int32_t i = 0; i < totalCols; ++i) {
SSchema *pSchema = &pStb->pSchema[i];
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pStb->pColumns[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pSchema->name, TSDB_COL_NAME_LEN, STB_DECODE_OVER)
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pStb->pTags[i];
SDB_GET_INT8(pRaw, dataPos, &pSchema->type, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->colId, STB_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pSchema->bytes, STB_DECODE_OVER)
......@@ -183,13 +200,24 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
mTrace("stb:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
taosWLockLatch(&pOld->lock);
int32_t totalCols = pNew->numOfTags + pNew->numOfColumns;
int32_t totalSize = totalCols * sizeof(SSchema);
if (pOld->numOfTags + pOld->numOfColumns < totalCols) {
void *pSchema = malloc(totalSize);
if (pOld->numOfColumns < pNew->numOfColumns) {
void *pSchema = malloc(pOld->numOfColumns * sizeof(SSchema));
if (pSchema != NULL) {
free(pOld->pSchema);
pOld->pSchema = pSchema;
free(pOld->pColumns);
pOld->pColumns = pSchema;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
taosWUnLockLatch(&pOld->lock);
}
}
if (pOld->numOfTags < pNew->numOfTags) {
void *pSchema = malloc(pOld->numOfTags * sizeof(SSchema));
if (pSchema != NULL) {
free(pOld->pTags);
pOld->pTags = pSchema;
} else {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mTrace("stb:%s, failed to perform update action since %s", pOld->name, terrstr());
......@@ -199,9 +227,11 @@ static int32_t mndStbActionUpdate(SSdb *pSdb, SStbObj *pOld, SStbObj *pNew) {
pOld->updateTime = pNew->updateTime;
pOld->version = pNew->version;
pOld->nextColId = pNew->nextColId;
pOld->numOfColumns = pNew->numOfColumns;
pOld->numOfTags = pNew->numOfTags;
memcpy(pOld->pSchema, pNew->pSchema, totalSize);
memcpy(pOld->pColumns, pNew->pColumns, pOld->numOfColumns * sizeof(SSchema));
memcpy(pOld->pTags, pNew->pTags, pOld->numOfTags * sizeof(SSchema));
taosWUnLockLatch(&pOld->lock);
return 0;
}
......@@ -242,9 +272,9 @@ static void *mndBuildCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pStb
req.type = TD_SUPER_TABLE;
req.stbCfg.suid = pStb->uid;
req.stbCfg.nCols = pStb->numOfColumns;
req.stbCfg.pSchema = pStb->pSchema;
req.stbCfg.pSchema = pStb->pColumns;
req.stbCfg.nTagCols = pStb->numOfTags;
req.stbCfg.pTagSchema = pStb->pSchema + pStb->numOfColumns;
req.stbCfg.pTagSchema = pStb->pTags;
int32_t contLen = tSerializeSVCreateTbReq(NULL, &req) + sizeof(SMsgHead);
SMsgHead *pHead = malloc(contLen);
......@@ -442,20 +472,32 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pReq, SMCreateStbReq *pCr
stbObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN);
stbObj.dbUid = pDb->uid;
stbObj.version = 1;
stbObj.nextColId = 1;
stbObj.numOfColumns = pCreate->numOfColumns;
stbObj.numOfTags = pCreate->numOfTags;
int32_t totalCols = stbObj.numOfColumns + stbObj.numOfTags;
int32_t totalSize = totalCols * sizeof(SSchema);
stbObj.pSchema = malloc(totalSize);
if (stbObj.pSchema == NULL) {
stbObj.pColumns = malloc(stbObj.numOfColumns * sizeof(SSchema));
if (stbObj.pColumns == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(stbObj.pSchema, pCreate->pSchema, totalSize);
memcpy(stbObj.pColumns, pCreate->pSchema, stbObj.numOfColumns * sizeof(SSchema));
for (int32_t i = 0; i < totalCols; ++i) {
stbObj.pSchema[i].colId = i + 1;
stbObj.pTags = malloc(stbObj.numOfTags * sizeof(SSchema));
if (stbObj.pTags == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(stbObj.pTags, pCreate->pSchema + stbObj.numOfColumns, stbObj.numOfTags * sizeof(SSchema));
for (int32_t i = 0; i < stbObj.numOfColumns; ++i) {
stbObj.pColumns[i].colId = stbObj.nextColId;
stbObj.nextColId++;
}
for (int32_t i = 0; i < stbObj.numOfTags; ++i) {
stbObj.pTags[i].colId = stbObj.nextColId;
stbObj.nextColId++;
}
int32_t code = -1;
......@@ -538,25 +580,29 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
}
static int32_t mndCheckAlterStbReq(SMAlterStbReq *pAlter) {
SSchema *pSchema = &pAlter->schema;
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
pAlter->numOfColumns = htonl(pAlter->numOfColumns);
if (pSchema->type <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->bytes <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
for (int32_t i = 0; i < pAlter->numOfColumns; ++i) {
SSchema *pSchema = &pAlter->pSchema[i];
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
if (pSchema->type <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->bytes <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
}
return 0;
......@@ -769,14 +815,24 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
pMeta->suid = htobe64(pStb->uid);
pMeta->tuid = htobe64(pStb->uid);
for (int32_t i = 0; i < totalCols; ++i) {
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSrcSchema = &pStb->pSchema[i];
SSchema *pSrcSchema = &pStb->pColumns[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pMeta->pSchema[i + pStb->numOfColumns];
SSchema *pSrcSchema = &pStb->pTags[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
......@@ -789,11 +845,11 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
}
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen) {
SSdb *pSdb = pMnode->pSdb;
int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
void *buf = malloc(bufSize);
int32_t len = 0;
int32_t contLen = 0;
SSdb *pSdb = pMnode->pSdb;
int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
void *buf = malloc(bufSize);
int32_t len = 0;
int32_t contLen = 0;
STableMetaRsp *pRsp = NULL;
for (int32_t i = 0; i < num; ++i) {
......@@ -803,7 +859,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
stb->tversion = ntohs(stb->tversion);
if ((contLen + sizeof(STableMetaRsp)) > bufSize) {
bufSize = contLen + (num -i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
bufSize = contLen + (num - i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
buf = realloc(buf, bufSize);
}
......@@ -812,9 +868,9 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
strcpy(pRsp->dbFName, stb->dbFName);
strcpy(pRsp->tbName, stb->stbName);
strcpy(pRsp->stbName, stb->stbName);
mDebug("start to retrieve meta, db:%s, stb:%s", stb->dbFName, stb->stbName);
SDbObj *pDb = mndAcquireDb(pMnode, stb->dbFName);
if (pDb == NULL) {
pRsp->numOfColumns = -1;
......@@ -826,7 +882,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", stb->dbFName, stb->stbName);
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
......@@ -836,7 +892,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
mWarn("stb:%s, failed to get meta since %s", tbFName, terrstr());
continue;
}
taosRLockLatch(&pStb->lock);
if (stb->suid == pStb->uid && stb->sversion == pStb->version) {
......@@ -845,17 +901,17 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
mndReleaseStb(pMnode, pStb);
continue;
}
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
int32_t len = totalCols * sizeof(SSchema);
contLen += sizeof(STableMetaRsp) + len;
if (contLen > bufSize) {
bufSize = contLen + (num -i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
bufSize = contLen + (num - i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
buf = realloc(buf, bufSize);
}
pRsp->numOfTags = htonl(pStb->numOfTags);
pRsp->numOfColumns = htonl(pStb->numOfColumns);
pRsp->precision = pDb->cfg.precision;
......@@ -864,15 +920,25 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
pRsp->sversion = htonl(pStb->version);
pRsp->suid = htobe64(pStb->uid);
pRsp->tuid = htobe64(pStb->uid);
for (int32_t i = 0; i < totalCols; ++i) {
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pRsp->pSchema[i];
SSchema *pSrcSchema = &pStb->pSchema[i];
SSchema *pSrcSchema = &pStb->pColumns[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pRsp->pSchema[i + pStb->numOfColumns];
SSchema *pSrcSchema = &pStb->pTags[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
}
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
......@@ -890,7 +956,6 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num
return 0;
}
static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
SSdb *pSdb = pMnode->pSdb;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册