提交 ab805d41 编写于 作者: S slguan

[TD-10] refact create table

上级 b23c4cd1
......@@ -402,3 +402,4 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) {
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
rpcSendResponse(&rpcRsp);
}
......@@ -332,11 +332,10 @@ typedef struct {
} SMgmtHead;
typedef struct {
int32_t contLen;
int32_t vgId;
int32_t sid;
int32_t numOfVPeers;
uint64_t uid;
SVnodeDesc vpeerDesc[TSDB_MAX_MPEERS];
char tableId[TSDB_TABLE_ID_LEN + 1];
} SMDDropTableMsg;
......
......@@ -373,14 +373,10 @@ int32_t mgmtDropChildTable(SDbObj *pDb, SChildTableObj *pTable) {
}
strcpy(pRemove->tableId, pTable->tableId);
pRemove->sid = htonl(pTable->sid);
pRemove->uid = htobe64(pTable->uid);
pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes);
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip);
pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
pRemove->vgId = htonl(pTable->vgId);
pRemove->contLen = htonl(sizeof(SMDDropTableMsg));
pRemove->sid = htonl(pTable->sid);
pRemove->uid = htobe64(pTable->uid);
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
......
......@@ -411,12 +411,6 @@ int32_t mgmtDropNormalTable(SDbObj *pDb, SNormalTableObj *pTable) {
pRemove->sid = htonl(pTable->sid);
pRemove->uid = htobe64(pTable->uid);
pRemove->numOfVPeers = htonl(pVgroup->numOfVnodes);
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
pRemove->vpeerDesc[i].ip = htonl(pVgroup->vnodeGid[i].ip);
pRemove->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
}
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
mTrace("table:%s, send drop table msg", pRemove->tableId);
SRpcMsg rpcMsg = {
......
......@@ -135,80 +135,7 @@ int32_t mgmtGetTableMeta(SDbObj *pDb, STableInfo *pTable, STableMeta *pMeta, boo
}
static void mgmtCreateTable(SVgObj *pVgroup, SQueuedMsg *pMsg) {
SCMCreateTableMsg *pCreate = pMsg->pCont;
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId);
mgmtCreateVgroup(pMsg);
return;
}
int32_t code;
STableInfo *pTable;
SMDCreateTableMsg *pMDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable);
} else {
mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable);
}
if (code != TSDB_CODE_SUCCESS) {
mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId);
mgmtSendSimpleResp(pMsg->thandle, code);
return;
}
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {
.handle = pMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
pMsg->ahandle = pTable;
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
}
int32_t mgmtDropTable(SDbObj *pDb, char *tableId, int32_t ignore) {
STableInfo *pTable = mgmtGetTable(tableId);
if (pTable == NULL) {
if (ignore) {
mTrace("table:%s, table is not exist, think it success", tableId);
return TSDB_CODE_SUCCESS;
} else {
mError("table:%s, failed to create table, table not exist", tableId);
return TSDB_CODE_INVALID_TABLE;
}
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
mError("table:%s, failed to create table, in monitor database", tableId);
return TSDB_CODE_MONITOR_DB_FORBIDDEN;
}
switch (pTable->type) {
case TSDB_SUPER_TABLE:
mTrace("table:%s, start to drop super table", tableId);
return mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable);
case TSDB_CHILD_TABLE:
mTrace("table:%s, start to drop child table", tableId);
return mgmtDropChildTable(pDb, (SChildTableObj *) pTable);
case TSDB_NORMAL_TABLE:
mTrace("table:%s, start to drop normal table", tableId);
return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
case TSDB_STREAM_TABLE:
mTrace("table:%s, start to drop stream table", tableId);
return mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
default:
mError("table:%s, invalid table type:%d", tableId, pTable->type);
return TSDB_CODE_INVALID_TABLE;
}
}
int32_t mgmtAlterTable(SDbObj *pDb, SCMAlterTableMsg *pAlter) {
......@@ -494,29 +421,58 @@ void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) {
if (pVgroup == NULL) {
mTrace("table:%s, start to create a new vgroup", pCreate->tableId);
mgmtCreateVgroup(newMsg);
return;
}
int32_t sid = taosAllocateId(pVgroup->idPool);
if (sid < 0) {
mTrace("tables:%s, no enough sid in vgroup:%d", pVgroup->vgId);
mgmtCreateVgroup(newMsg);
return;
}
SMDCreateTableMsg *pMDCreate = NULL;
if (pCreate->numOfColumns == 0) {
mTrace("table:%s, is a child table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
code = mgmtCreateChildTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable);
} else {
mTrace("table:%s, vgroup:%d is selected", pCreate->tableId, pVgroup->vgId);
mgmtCreateTable(pVgroup, newMsg);
mTrace("table:%s, is a normal table, vgroup:%d sid:%d ahandle:%p", pCreate->tableId, pVgroup->vgId, sid, pMsg);
code = mgmtCreateNormalTable(pCreate, pMsg->contLen, pVgroup, sid, &pMDCreate, &pTable);
}
if (code != TSDB_CODE_SUCCESS) {
mTrace("table:%s, failed to create in vgroup:%d", pCreate->tableId, pVgroup->vgId);
mgmtSendSimpleResp(pMsg->thandle, code);
return;
}
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
SRpcMsg rpcMsg = {
.handle = newMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
};
newMsg->ahandle = pTable;
mgmtSendMsgToDnode(&ipSet, &rpcMsg);
}
void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
SCMDropTableMsg *pDrop = pMsg->pCont;
if (mgmtCheckRedirect(pMsg->thandle)) return;
if (mgmtCheckRedirect(pMsg->thandle)) {
mError("thandle:%p, failed to drop table:%s, need redirect message", pMsg->thandle, pDrop->tableId);
return;
}
SCMDropTableMsg *pDrop = pMsg->pCont;
mTrace("table:%s, drop msg is received from thandle:%p", pDrop->tableId, pMsg->thandle);
SUserObj *pUser = mgmtGetUserFromConn(pMsg->thandle);
if (pUser == NULL) {
mError("table:%s, failed to drop table, invalid user", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_USER);
if (mgmtCheckExpired()) {
mError("table:%s, failed to drop, grant expired", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_GRANT_EXPIRED);
return;
}
if (!pUser->writeAuth) {
mError("table:%s, failed to drop table, no rights", pDrop->tableId);
if (!pMsg->pUser->writeAuth) {
mError("table:%s, failed to drop, no rights", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
return;
}
......@@ -528,9 +484,45 @@ void mgmtProcessDropTableMsg(SQueuedMsg *pMsg) {
return;
}
int32_t code = mgmtDropTable(pDb, pDrop->tableId, pDrop->igNotExists);
if (code != TSDB_CODE_ACTION_IN_PROGRESS) {
mgmtSendSimpleResp(pMsg->thandle, code);
STableInfo *pTable = mgmtGetTable(pDrop->tableId);
if (pTable == NULL) {
if (pDrop->igNotExists) {
mTrace("table:%s, table is not exist, think drop success", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS);
return;
} else {
mError("table:%s, failed to drop table, table not exist", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
return;
}
}
if (mgmtCheckIsMonitorDB(pDb->name, tsMonitorDbName)) {
mError("table:%s, failed to create table, in monitor database", pDrop->tableId);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_MONITOR_DB_FORBIDDEN);
return;
}
switch (pTable->type) {
case TSDB_SUPER_TABLE:
mTrace("table:%s, start to drop super table", pDrop->tableId);
mgmtDropSuperTable(pDb, (SSuperTableObj *) pTable);
break;
case TSDB_CHILD_TABLE:
mTrace("table:%s, start to drop child table", pDrop->tableId);
mgmtDropChildTable(pDb, (SChildTableObj *) pTable);
break;
case TSDB_NORMAL_TABLE:
mTrace("table:%s, start to drop normal table", pDrop->tableId);
mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
break;
case TSDB_STREAM_TABLE:
mTrace("table:%s, start to drop stream table", pDrop->tableId);
mgmtDropNormalTable(pDb, (SNormalTableObj *) pTable);
break;
default:
mError("table:%s, invalid table type:%d", pDrop->tableId, pTable->type);
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
}
}
......
......@@ -189,12 +189,12 @@ int32_t mgmtDropVgroup(SDbObj *pDb, SVgObj *pVgroup) {
STableInfo *pTable;
if (pVgroup->numOfTables > 0) {
for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) {
if (pVgroup->tableList != NULL) {
pTable = pVgroup->tableList[i];
if (pTable) mgmtDropTable(pDb, pTable->tableId, 0);
}
}
// for (int32_t i = 0; i < pDb->cfg.maxSessions; ++i) {
// if (pVgroup->tableList != NULL) {
// pTable = pVgroup->tableList[i];
// if (pTable) mgmtDropTable(pDb, pTable->tableId, 0);
// }
// }
}
mTrace("vgroup:%d, db:%s replica:%d is deleted", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册