提交 e61ee9fb 编写于 作者: L lichuang

[TD-4394]add modify column,tag width implementation

上级 b141d319
......@@ -93,6 +93,9 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg);
static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg);
static int32_t mnodeFindSuperTableColumnIndex(SSTableObj *pStable, char *colName);
static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg);
static int32_t mnodeChangeSuperTableTag(SMnodeMsg *pMsg);
static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg);
static void mnodeDestroyChildTable(SCTableObj *pTable) {
tfree(pTable->info.tableId);
......@@ -1457,31 +1460,52 @@ static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
return code;
}
static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) {
static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg) {
SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
char* name = pAlter->schema[0].name;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableColumnIndex(pStable, oldName);
int32_t col = mnodeFindSuperTableColumnIndex(pStable, name);
if (col < 0) {
mError("msg:%p, app:%p stable:%s, change column, oldName:%s, newName:%s", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, oldName, newName);
mError("msg:%p, app:%p stable:%s, change column, name:%s", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, name);
return TSDB_CODE_MND_FIELD_NOT_EXIST;
}
// int32_t rowSize = 0;
uint32_t len = (uint32_t)strlen(newName);
if (len >= TSDB_COL_NAME_LEN) {
return TSDB_CODE_MND_COL_NAME_TOO_LONG;
}
// update
SSchema *schema = (SSchema *) (pStable->schema + col);
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
schema->bytes = pAlter->schema[0].bytes;
mInfo("msg:%p, app:%p stable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
name, schema->bytes);
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
.pTable = tsSuperTableSdb,
.pObj = pStable,
.pMsg = pMsg,
.fpRsp = mnodeChangeSuperTableColumnCb
};
return sdbUpdateRow(&row);
}
if (mnodeFindSuperTableColumnIndex(pStable, newName) >= 0) {
return TSDB_CODE_MND_FIELD_ALREAY_EXIST;
static int32_t mnodeChangeSuperTableTag(SMnodeMsg *pMsg) {
SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
char* name = pAlter->schema[0].name;
SSTableObj *pStable = (SSTableObj *)pMsg->pTable;
int32_t col = mnodeFindSuperTableTagIndex(pStable, name);
if (col < 0) {
mError("msg:%p, app:%p stable:%s, change column, name:%s", pMsg, pMsg->rpcMsg.ahandle,
pStable->info.tableId, name);
return TSDB_CODE_MND_FIELD_NOT_EXIST;
}
// update
SSchema *schema = (SSchema *) (pStable->schema + col);
tstrncpy(schema->name, newName, sizeof(schema->name));
mInfo("msg:%p, app:%p stable %s, start to modify column %s to %s", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
oldName, newName);
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
schema->bytes = pAlter->schema[0].bytes;
mInfo("msg:%p, app:%p stable %s, start to modify tag len %s to %d", pMsg, pMsg->rpcMsg.ahandle, pStable->info.tableId,
name, schema->bytes);
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
......@@ -2355,31 +2379,23 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
return sdbUpdateRow(&row);
}
static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) {
static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg) {
SAlterTableMsg *pAlter = pMsg->rpcMsg.pCont;
char* name = pAlter->schema[0].name;
SCTableObj *pTable = (SCTableObj *)pMsg->pTable;
int32_t col = mnodeFindNormalTableColumnIndex(pTable, oldName);
int32_t col = mnodeFindNormalTableColumnIndex(pTable, name);
if (col < 0) {
mError("msg:%p, app:%p ctable:%s, change column, oldName: %s, newName: %s", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, oldName, newName);
mError("msg:%p, app:%p ctable:%s, change column, name: %s", pMsg, pMsg->rpcMsg.ahandle,
pTable->info.tableId, name);
return TSDB_CODE_MND_FIELD_NOT_EXIST;
}
// int32_t rowSize = 0;
uint32_t len = (uint32_t)strlen(newName);
if (len >= TSDB_COL_NAME_LEN) {
return TSDB_CODE_MND_COL_NAME_TOO_LONG;
}
if (mnodeFindNormalTableColumnIndex(pTable, newName) >= 0) {
return TSDB_CODE_MND_FIELD_ALREAY_EXIST;
}
// update
SSchema *schema = (SSchema *) (pTable->schema + col);
tstrncpy(schema->name, newName, sizeof(schema->name));
ASSERT(schema->type == TSDB_DATA_TYPE_BINARY || schema->type == TSDB_DATA_TYPE_NCHAR);
schema->bytes = pAlter->schema[0].bytes;
mInfo("msg:%p, app:%p ctable %s, start to modify column %s to %s", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
oldName, newName);
mInfo("msg:%p, app:%p ctable %s, start to modify column %s len to %d", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
name, schema->bytes);
SSdbRow row = {
.type = SDB_OPER_GLOBAL,
......@@ -3214,15 +3230,9 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
code = mnodeDropSuperTableColumn(pMsg, pAlter->schema[0].name);
} else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_COLUMN) {
//code = mnodeChangeSuperTableColumn(pMsg, pAlter->schema[0].name, pAlter->schema[1].name);
(void)mnodeChangeSuperTableColumn;
mError("change table[%s] column[%s] length to [%d] is not processed", pAlter->tableFname, pAlter->schema[0].name, pAlter->schema[0].bytes);
code = TSDB_CODE_SUCCESS;
code = mnodeChangeSuperTableColumn(pMsg);
} else if (pAlter->type == TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN) {
//code = mnodeChangeSuperTableColumn(pMsg, pAlter->schema[0].name, pAlter->schema[1].name);
(void)mnodeChangeSuperTableColumn;
mError("change table[%s] tag[%s] length to [%d] is not processed", pAlter->tableFname, pAlter->schema[0].name, pAlter->schema[0].bytes);
code = TSDB_CODE_SUCCESS;
code = mnodeChangeSuperTableTag(pMsg);
} else {
}
} else {
......@@ -3234,10 +3244,7 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
} else if (pAlter->type == TSDB_ALTER_TABLE_DROP_COLUMN) {
code = mnodeDropNormalTableColumn(pMsg, pAlter->schema[0].name);
} else if (pAlter->type == TSDB_ALTER_TABLE_CHANGE_COLUMN) {
//code = mnodeChangeNormalTableColumn(pMsg, pAlter->schema[0].name, pAlter->schema[1].name);
(void)mnodeChangeNormalTableColumn;
mError("change table[%s] column[%s] length to [%d] is not processed", pAlter->tableFname, pAlter->schema[0].name, pAlter->schema[0].bytes);
code = TSDB_CODE_SUCCESS;
code = mnodeChangeNormalTableColumn(pMsg);
} else {
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册