未验证 提交 099f6f9e 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #10181 from taosdata/feature/mnode

alter stb
...@@ -278,10 +278,13 @@ void* tDeserializeSMDropStbReq(void* buf, SMDropStbReq* pReq); ...@@ -278,10 +278,13 @@ void* tDeserializeSMDropStbReq(void* buf, SMDropStbReq* pReq);
typedef struct { typedef struct {
char name[TSDB_TABLE_FNAME_LEN]; char name[TSDB_TABLE_FNAME_LEN];
int8_t alterType; int8_t alterType;
int32_t numOfSchemas; int32_t numOfFields;
SSchema pSchemas[]; SArray* pFields;
} SMAltertbReq; } SMAltertbReq;
int32_t tSerializeSMAlterStbReq(void** buf, SMAltertbReq* pReq);
void* tDeserializeSMAlterStbReq(void* buf, SMAltertbReq* pReq);
typedef struct { typedef struct {
int32_t pid; int32_t pid;
char app[TSDB_APP_NAME_LEN]; char app[TSDB_APP_NAME_LEN];
......
...@@ -415,3 +415,45 @@ void *tDeserializeSMDropStbReq(void *buf, SMDropStbReq *pReq) { ...@@ -415,3 +415,45 @@ void *tDeserializeSMDropStbReq(void *buf, SMDropStbReq *pReq) {
return buf; return buf;
} }
int32_t tSerializeSMAlterStbReq(void **buf, SMAltertbReq *pReq) {
int32_t tlen = 0;
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeFixedI8(buf, pReq->alterType);
tlen += taosEncodeFixedI32(buf, pReq->numOfFields);
for (int32_t i = 0; i < pReq->numOfFields; ++i) {
SField *pField = taosArrayGet(pReq->pFields, i);
tlen += taosEncodeFixedU8(buf, pField->type);
tlen += taosEncodeFixedI32(buf, pField->bytes);
tlen += taosEncodeString(buf, pField->name);
}
return tlen;
}
void *tDeserializeSMAlterStbReq(void *buf, SMAltertbReq *pReq) {
buf = taosDecodeStringTo(buf, pReq->name);
buf = taosDecodeFixedI8(buf, &pReq->alterType);
buf = taosDecodeFixedI32(buf, &pReq->numOfFields);
pReq->pFields = taosArrayInit(pReq->numOfFields, sizeof(SField));
if (pReq->pFields == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < pReq->numOfFields; ++i) {
SField field = {0};
buf = taosDecodeFixedU8(buf, &field.type);
buf = taosDecodeFixedI32(buf, &field.bytes);
buf = taosDecodeStringTo(buf, field.name);
if (taosArrayPush(pReq->pFields, &field) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
}
return buf;
}
...@@ -601,26 +601,23 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) { ...@@ -601,26 +601,23 @@ static int32_t mndProcessVCreateStbRsp(SMnodeMsg *pRsp) {
} }
static int32_t mndCheckAlterStbReq(SMAltertbReq *pAlter) { static int32_t mndCheckAlterStbReq(SMAltertbReq *pAlter) {
pAlter->numOfSchemas = htonl(pAlter->numOfSchemas); if (pAlter->numOfFields < 1 || pAlter->numOfFields != (int32_t)taosArrayGetSize(pAlter->pFields)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
for (int32_t i = 0; i < pAlter->numOfSchemas; ++i) { for (int32_t i = 0; i < pAlter->numOfFields; ++i) {
SSchema *pSchema = &pAlter->pSchemas[i]; SField *pField = taosArrayGet(pAlter->pFields, i);
pSchema->colId = htonl(pSchema->colId);
pSchema->bytes = htonl(pSchema->bytes);
if (pSchema->type <= 0) { if (pField->type <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
if (pSchema->colId < 0 || pSchema->colId >= (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pSchema->bytes <= 0) { if (pField->bytes <= 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
if (pSchema->name[0] == 0) { if (pField->name[0] == 0) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1; return -1;
} }
...@@ -662,7 +659,7 @@ static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) { ...@@ -662,7 +659,7 @@ static int32_t mndAllocStbSchemas(const SStbObj *pOld, SStbObj *pNew) {
return 0; return 0;
} }
static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchemas, int32_t ntags) { static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ntags) {
if (pOld->numOfTags + ntags > TSDB_MAX_TAGS) { if (pOld->numOfTags + ntags > TSDB_MAX_TAGS) {
terrno = TSDB_CODE_MND_TOO_MANY_TAGS; terrno = TSDB_CODE_MND_TOO_MANY_TAGS;
return -1; return -1;
...@@ -673,33 +670,34 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const SSc ...@@ -673,33 +670,34 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const SSc
return -1; return -1;
} }
pNew->numOfTags = pNew->numOfTags + ntags;
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
for (int32_t i = 0; i < ntags; i++) { for (int32_t i = 0; i < ntags; i++) {
if (mndFindSuperTableColumnIndex(pOld, pSchemas[i].name) > 0) { SField *pField = taosArrayGet(pFields, i);
if (mndFindSuperTableColumnIndex(pOld, pField->name) > 0) {
terrno = TSDB_CODE_MND_COLUMN_ALREADY_EXIST; terrno = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
return -1; return -1;
} }
if (mndFindSuperTableTagIndex(pOld, pSchemas[i].name) > 0) { if (mndFindSuperTableTagIndex(pOld, pField->name) > 0) {
terrno = TSDB_CODE_MND_TAG_ALREADY_EXIST; terrno = TSDB_CODE_MND_TAG_ALREADY_EXIST;
return -1; return -1;
} }
}
pNew->numOfTags = pNew->numOfTags + ntags; SSchema *pSchema = &pNew->pTags[pOld->numOfTags + i];
if (mndAllocStbSchemas(pOld, pNew) != 0) { pSchema->bytes = pField->bytes;
return -1; pSchema->type = pField->type;
} memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
memcpy(pNew->pTags + pOld->numOfTags, pSchemas, sizeof(SSchema) * ntags);
for (int32_t i = pOld->numOfTags; i < pNew->numOfTags; i++) {
SSchema *pSchema = &pNew->pTags[i];
pSchema->colId = pNew->nextColId; pSchema->colId = pNew->nextColId;
pNew->nextColId++; pNew->nextColId++;
mDebug("stb:%s, start to add tag %s", pNew->name, pSchema->name);
} }
pNew->version++; pNew->version++;
mDebug("stb:%s, start to add tag %s", pNew->name, pSchemas[0].name);
return 0; return 0;
} }
...@@ -722,7 +720,18 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch ...@@ -722,7 +720,18 @@ static int32_t mndDropSuperTableTag(const SStbObj *pOld, SStbObj *pNew, const ch
return 0; return 0;
} }
static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, const char *oldTagName, const char *newTagName) { static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, SArray *pFields) {
if ((int32_t)taosArrayGetSize(pFields) != 2) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
}
SField *pField0 = taosArrayGet(pFields, 0);
SField *pField1 = taosArrayGet(pFields, 1);
const char *oldTagName = pField0->name;
const char *newTagName = pField1->name;
int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName); int32_t tag = mndFindSuperTableTagIndex(pOld, oldTagName);
if (tag < 0) { if (tag < 0) {
terrno = TSDB_CODE_MND_TAG_NOT_EXIST; terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
...@@ -751,8 +760,8 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, const char ...@@ -751,8 +760,8 @@ static int32_t mndAlterStbTagName(const SStbObj *pOld, SStbObj *pNew, const char
return 0; return 0;
} }
static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) { static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
int32_t tag = mndFindSuperTableTagIndex(pOld, pSchema->name); int32_t tag = mndFindSuperTableTagIndex(pOld, pField->name);
if (tag < 0) { if (tag < 0) {
terrno = TSDB_CODE_MND_TAG_NOT_EXIST; terrno = TSDB_CODE_MND_TAG_NOT_EXIST;
return -1; return -1;
...@@ -769,51 +778,52 @@ static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSc ...@@ -769,51 +778,52 @@ static int32_t mndAlterStbTagBytes(const SStbObj *pOld, SStbObj *pNew, const SSc
return -1; return -1;
} }
if (pSchema->bytes <= pTag->bytes) { if (pField->bytes <= pTag->bytes) {
terrno = TSDB_CODE_MND_INVALID_ROW_BYTES; terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
return -1; return -1;
} }
pTag->bytes = pSchema->bytes; pTag->bytes = pField->bytes;
pNew->version++; pNew->version++;
mDebug("stb:%s, start to modify tag len %s to %d", pNew->name, pSchema->name, pSchema->bytes); mDebug("stb:%s, start to modify tag len %s to %d", pNew->name, pField->name, pField->bytes);
return 0; return 0;
} }
static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchemas, int32_t ncols) { static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray *pFields, int32_t ncols) {
if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) { if (pOld->numOfColumns + ncols + pOld->numOfTags > TSDB_MAX_COLUMNS) {
terrno = TSDB_CODE_MND_TOO_MANY_COLUMNS; terrno = TSDB_CODE_MND_TOO_MANY_COLUMNS;
return -1; return -1;
} }
pNew->numOfColumns = pNew->numOfColumns + ncols;
if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1;
}
for (int32_t i = 0; i < ncols; i++) { for (int32_t i = 0; i < ncols; i++) {
if (mndFindSuperTableColumnIndex(pOld, pSchemas[i].name) > 0) { SField *pField = taosArrayGet(pFields, i);
if (mndFindSuperTableColumnIndex(pOld, pField->name) > 0) {
terrno = TSDB_CODE_MND_COLUMN_ALREADY_EXIST; terrno = TSDB_CODE_MND_COLUMN_ALREADY_EXIST;
return -1; return -1;
} }
if (mndFindSuperTableTagIndex(pOld, pSchemas[i].name) > 0) { if (mndFindSuperTableTagIndex(pOld, pField->name) > 0) {
terrno = TSDB_CODE_MND_TAG_ALREADY_EXIST; terrno = TSDB_CODE_MND_TAG_ALREADY_EXIST;
return -1; return -1;
} }
}
pNew->numOfColumns = pNew->numOfColumns + ncols; SSchema *pSchema = &pNew->pColumns[pOld->numOfColumns + i];
if (mndAllocStbSchemas(pOld, pNew) != 0) { pSchema->bytes = pField->bytes;
return -1; pSchema->type = pField->type;
} memcpy(pSchema->name, pField->name, TSDB_COL_NAME_LEN);
memcpy(pNew->pColumns + pOld->numOfColumns, pSchemas, sizeof(SSchema) * ncols);
for (int32_t i = pOld->numOfColumns; i < pNew->numOfColumns; i++) {
SSchema *pSchema = &pNew->pColumns[i];
pSchema->colId = pNew->nextColId; pSchema->colId = pNew->nextColId;
pNew->nextColId++; pNew->nextColId++;
mDebug("stb:%s, start to add column %s", pNew->name, pSchema->name);
} }
pNew->version++; pNew->version++;
mDebug("stb:%s, start to add column %s", pNew->name, pSchemas[0].name);
return 0; return 0;
} }
...@@ -846,8 +856,8 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const ...@@ -846,8 +856,8 @@ static int32_t mndDropSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, const
return 0; return 0;
} }
static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SSchema *pSchema) { static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const SField *pField) {
int32_t col = mndFindSuperTableColumnIndex(pOld, pSchema->name); int32_t col = mndFindSuperTableColumnIndex(pOld, pField->name);
if (col < 0) { if (col < 0) {
terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST; terrno = TSDB_CODE_MND_COLUMN_NOT_EXIST;
return -1; return -1;
...@@ -855,7 +865,7 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const ...@@ -855,7 +865,7 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
uint32_t nLen = 0; uint32_t nLen = 0;
for (int32_t i = 0; i < pOld->numOfColumns; ++i) { for (int32_t i = 0; i < pOld->numOfColumns; ++i) {
nLen += (pOld->pColumns[i].colId == col) ? pSchema->bytes : pOld->pColumns[i].bytes; nLen += (pOld->pColumns[i].colId == col) ? pField->bytes : pOld->pColumns[i].bytes;
} }
if (nLen > TSDB_MAX_BYTES_PER_ROW) { if (nLen > TSDB_MAX_BYTES_PER_ROW) {
...@@ -873,15 +883,15 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const ...@@ -873,15 +883,15 @@ static int32_t mndAlterStbColumnBytes(const SStbObj *pOld, SStbObj *pNew, const
return -1; return -1;
} }
if (pSchema->bytes <= pCol->bytes) { if (pField->bytes <= pCol->bytes) {
terrno = TSDB_CODE_MND_INVALID_ROW_BYTES; terrno = TSDB_CODE_MND_INVALID_ROW_BYTES;
return -1; return -1;
} }
pCol->bytes = pSchema->bytes; pCol->bytes = pField->bytes;
pNew->version++; pNew->version++;
mDebug("stb:%s, start to modify col len %s to %d", pNew->name, pSchema->name, pSchema->bytes); mDebug("stb:%s, start to modify col len %s to %d", pNew->name, pField->name, pField->bytes);
return 0; return 0;
} }
...@@ -950,28 +960,29 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq * ...@@ -950,28 +960,29 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *
int32_t code = -1; int32_t code = -1;
STrans *pTrans = NULL; STrans *pTrans = NULL;
SField *pField0 = taosArrayGet(pAlter->pFields, 0);
switch (pAlter->alterType) { switch (pAlter->alterType) {
case TSDB_ALTER_TABLE_ADD_TAG: case TSDB_ALTER_TABLE_ADD_TAG:
code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pSchemas, 1); code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
break; break;
case TSDB_ALTER_TABLE_DROP_TAG: case TSDB_ALTER_TABLE_DROP_TAG:
code = mndDropSuperTableTag(pOld, &stbObj, pAlter->pSchemas[0].name); code = mndDropSuperTableTag(pOld, &stbObj, pField0->name);
break; break;
case TSDB_ALTER_TABLE_UPDATE_TAG_NAME: case TSDB_ALTER_TABLE_UPDATE_TAG_NAME:
code = mndAlterStbTagName(pOld, &stbObj, pAlter->pSchemas[0].name, pAlter->pSchemas[1].name); code = mndAlterStbTagName(pOld, &stbObj, pAlter->pFields);
break; break;
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES: case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
code = mndAlterStbTagBytes(pOld, &stbObj, &pAlter->pSchemas[0]); code = mndAlterStbTagBytes(pOld, &stbObj, pField0);
break; break;
case TSDB_ALTER_TABLE_ADD_COLUMN: case TSDB_ALTER_TABLE_ADD_COLUMN:
code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pSchemas, 1); code = mndAddSuperTableColumn(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
break; break;
case TSDB_ALTER_TABLE_DROP_COLUMN: case TSDB_ALTER_TABLE_DROP_COLUMN:
code = mndDropSuperTableColumn(pOld, &stbObj, pAlter->pSchemas[0].name); code = mndDropSuperTableColumn(pOld, &stbObj, pField0->name);
break; break;
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:
code = mndAlterStbColumnBytes(pOld, &stbObj, &pAlter->pSchemas[0]); code = mndAlterStbColumnBytes(pOld, &stbObj, pField0);
break; break;
default: default:
terrno = TSDB_CODE_MND_INVALID_STB_OPTION; terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
...@@ -1001,40 +1012,43 @@ ALTER_STB_OVER: ...@@ -1001,40 +1012,43 @@ ALTER_STB_OVER:
} }
static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) { static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SMAltertbReq *pAlter = pReq->rpcMsg.pCont; int32_t code = -1;
SDbObj *pDb = NULL;
SStbObj *pStb = NULL;
SMAltertbReq alterReq = {0};
mDebug("stb:%s, start to alter", pAlter->name); if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, &alterReq) == NULL) goto ALTER_STB_OVER;
if (mndCheckAlterStbReq(pAlter) != 0) { mDebug("stb:%s, start to alter", alterReq.name);
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr()); if (mndCheckAlterStbReq(&alterReq) != 0) goto ALTER_STB_OVER;
return -1;
}
SDbObj *pDb = mndAcquireDbByStb(pMnode, pAlter->name); pDb = mndAcquireDbByStb(pMnode, alterReq.name);
if (pDb == NULL) { if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB; terrno = TSDB_CODE_MND_INVALID_DB;
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr()); goto ALTER_STB_OVER;
return -1;
} }
SStbObj *pStb = mndAcquireStb(pMnode, pAlter->name); pStb = mndAcquireStb(pMnode, alterReq.name);
if (pStb == NULL) { if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_STB_NOT_EXIST; terrno = TSDB_CODE_MND_STB_NOT_EXIST;
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr()); goto ALTER_STB_OVER;
return -1;
} }
int32_t code = mndAlterStb(pMnode, pReq, pAlter, pDb, pStb); code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
mndReleaseStb(pMnode, pStb);
ALTER_STB_OVER:
if (code != 0) { if (code != 0) {
mError("stb:%s, failed to alter since %s", pAlter->name, terrstr()); mError("stb:%s, failed to alter since %s", alterReq.name, terrstr());
return code; } else {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return TSDB_CODE_MND_ACTION_IN_PROGRESS; mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb);
taosArrayClear(alterReq.pFields);
return code;
} }
static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) { static int32_t mndProcessVAlterStbRsp(SMnodeMsg *pRsp) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册