提交 62d35aad 编写于 作者: S Shengliang Guan

[TD-695] let normal table alter be a sync operation

上级 c6f1b02e
......@@ -1753,11 +1753,43 @@ static int32_t mnodeFindNormalTableColumnIndex(SChildTableObj *pTable, char *col
return -1;
}
static int32_t mnodeAddNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
static int32_t mnodeAlterNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
mLPrint("app:%p:%p, ctable %s, add column result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code));
return code;
if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, ctable %s, failed to alter column, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code));
return code;
}
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(NULL, pTable);
if (pMDCreate == NULL) {
return terrno;
}
if (pMsg->pVgroup == NULL) {
pMsg->pVgroup = mnodeGetVgroup(pTable->vgId);
if (pMsg->pVgroup == NULL) {
rpcFreeCont(pMDCreate);
mError("app:%p:%p, ctable %s, vgId:%d not exist in mnode", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
pTable->vgId);
return TSDB_CODE_MND_VGROUP_NOT_EXIST;
}
}
SRpcIpSet ipSet = mnodeGetIpSetFromVgroup(pMsg->pVgroup);
SRpcMsg rpcMsg = {
.handle = pMsg,
.pCont = pMDCreate,
.contLen = htonl(pMDCreate->contLen),
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_ALTER_TABLE
};
mTrace("app:%p:%p, ctable %s, send alter column msg to vgId:%d", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
pMsg->pVgroup->vgId);
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32_t ncols) {
......@@ -1802,7 +1834,7 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
.table = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.cb = mnodeAddNormalTableColumnCb
.cb = mnodeAlterNormalTableColumnCb
};
int32_t code = sdbUpdateRow(&oper);
......@@ -1813,13 +1845,6 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
return code;
}
static int32_t mnodeDropNormalTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
mLPrint("app:%p:%p, ctable %s, drop column result:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code));
return code;
}
static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
SDbObj *pDb = pMsg->pDb;
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
......@@ -1847,7 +1872,7 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
.table = tsChildTableSdb,
.pObj = pTable,
.pMsg = pMsg,
.cb = mnodeDropNormalTableColumnCb
.cb = mnodeAlterNormalTableColumnCb
};
int32_t code = sdbUpdateRow(&oper);
......@@ -2185,9 +2210,33 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
}
}
// not implemented yet
static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
mTrace("alter table rsp received, handle:%p code:%s", rpcMsg->handle, tstrerror(rpcMsg->code));
if (rpcMsg->handle == NULL) return;
SMnodeMsg *mnodeMsg = rpcMsg->handle;
mnodeMsg->received++;
SChildTableObj *pTable = (SChildTableObj *)mnodeMsg->pTable;
assert(pTable);
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
mTrace("app:%p:%p, ctable:%s, altered in dnode, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code));
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS);
} else {
if (mnodeMsg->retry++ < 3) {
mTrace("app:%p:%p, table:%s, alter table rsp received, need retry, times:%d result:%s thandle:%p",
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->retry, tstrerror(rpcMsg->code),
mnodeMsg->rpcMsg.handle);
dnodeDelayReprocessMnodeWriteMsg(mnodeMsg);
} else {
mError("app:%p:%p, table:%s, failed to alter in dnode, result:%s thandle:%p", mnodeMsg->rpcMsg.ahandle, mnodeMsg,
pTable->info.tableId, tstrerror(rpcMsg->code), mnodeMsg->rpcMsg.handle);
dnodeSendRpcMnodeWriteRsp(mnodeMsg, rpcMsg->code);
}
}
}
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
......
......@@ -133,6 +133,13 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
}
static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
// TODO: disposed in tsdb
// STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont);
// if (pCfg == NULL) return terrno;
// if (tsdbCreateTable(pVnode->tsdb, pCfg) < 0) code = terrno;
// tsdbClearTableCfg(pCfg);
vTrace("vgId:%d, alter table msg is received", pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
......
......@@ -180,11 +180,9 @@ if $data28 != null then
endi
print ======== step4
sleep 2500
sql alter table tb drop column d
sql alter table tb drop column e
sql insert into tb values(now-19d, -19, 6, 3, 0)
sleep 3000
sql select * from tb order by ts desc
if $rows != 4 then
return -1
......@@ -287,10 +285,8 @@ if $data38 != null then
endi
print ======== step5
sleep 2500
sql alter table tb drop column g
sql insert into tb values(now-16d, -16, 9, 5)
sleep 3000
sql select count(f) from tb
if $data00 != 5 then
return -1
......@@ -421,10 +417,8 @@ if $data48 != null then
endi
print ======== step6
sleep 2500
sql alter table tb drop column f
sql insert into tb values(now-13d, -13, 7)
sleep 3000
sql select * from tb order by ts desc
if $rows != 6 then
return -1
......@@ -551,10 +545,8 @@ if $data58 != null then
endi
print ======== step7
sleep 2500
sql alter table tb drop column h
sql insert into tb values(now-10d, -10)
sleep 3000
sql select * from tb order by ts desc
if $rows != 7 then
return -1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册