提交 607ca02f 编写于 作者: S Shengliang Guan

[TD-997]

上级 10541dd2
...@@ -42,6 +42,7 @@ typedef struct SMnodeMsg { ...@@ -42,6 +42,7 @@ typedef struct SMnodeMsg {
int8_t expected; int8_t expected;
int8_t retry; int8_t retry;
int32_t code; int32_t code;
void * pObj;
struct SAcctObj * pAcct; struct SAcctObj * pAcct;
struct SDnodeObj *pDnode; struct SDnodeObj *pDnode;
struct SUserObj * pUser; struct SUserObj * pUser;
......
...@@ -47,15 +47,16 @@ typedef enum { ...@@ -47,15 +47,16 @@ typedef enum {
SDB_OPER_LOCAL SDB_OPER_LOCAL
} ESdbOper; } ESdbOper;
typedef struct { typedef struct SSdbOper {
ESdbOper type; ESdbOper type;
void * table;
void * pObj;
void * rowData;
int32_t rowSize; int32_t rowSize;
int32_t retCode; // for callback in sdb queue int32_t retCode; // for callback in sdb queue
int32_t processedCount; // for sync fwd callback int32_t processedCount; // for sync fwd callback
int32_t (*cb)(struct SMnodeMsg *pMsg, int32_t code); int32_t (*reqFp)(struct SMnodeMsg *pMsg);
int32_t (*writeCb)(struct SMnodeMsg *pMsg, int32_t code);
void * table;
void * pObj;
void * rowData;
struct SMnodeMsg *pMsg; struct SMnodeMsg *pMsg;
} SSdbOper; } SSdbOper;
...@@ -86,6 +87,7 @@ void sdbUpdateMnodeRoles(); ...@@ -86,6 +87,7 @@ void sdbUpdateMnodeRoles();
int32_t sdbInsertRow(SSdbOper *pOper); int32_t sdbInsertRow(SSdbOper *pOper);
int32_t sdbDeleteRow(SSdbOper *pOper); int32_t sdbDeleteRow(SSdbOper *pOper);
int32_t sdbUpdateRow(SSdbOper *pOper); int32_t sdbUpdateRow(SSdbOper *pOper);
int32_t sdbInsertRowImp(SSdbOper *pOper);
void *sdbGetRow(void *handle, void *key); void *sdbGetRow(void *handle, void *key);
void *sdbFetchRow(void *handle, void *pIter, void **ppRow); void *sdbFetchRow(void *handle, void *pIter, void **ppRow);
......
...@@ -85,7 +85,7 @@ static int32_t mnodeAcctActionRestored() { ...@@ -85,7 +85,7 @@ static int32_t mnodeAcctActionRestored() {
if (numOfRows <= 0 && dnodeIsFirstDeploy()) { if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
mInfo("dnode first deploy, create root acct"); mInfo("dnode first deploy, create root acct");
int32_t code = mnodeCreateRootAcct(); int32_t code = mnodeCreateRootAcct();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("failed to create root account, reason:%s", tstrerror(code)); mError("failed to create root account, reason:%s", tstrerror(code));
return code; return code;
} }
......
...@@ -71,7 +71,7 @@ static int32_t mnodeClusterActionRestored() { ...@@ -71,7 +71,7 @@ static int32_t mnodeClusterActionRestored() {
if (numOfRows <= 0 && dnodeIsFirstDeploy()) { if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
mInfo("dnode first deploy, create cluster"); mInfo("dnode first deploy, create cluster");
int32_t code = mnodeCreateCluster(); int32_t code = mnodeCreateCluster();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("failed to create cluster, reason:%s", tstrerror(code)); mError("failed to create cluster, reason:%s", tstrerror(code));
return code; return code;
} }
...@@ -159,16 +159,15 @@ int32_t mnodeGetClusterId() { ...@@ -159,16 +159,15 @@ int32_t mnodeGetClusterId() {
void mnodeUpdateClusterId() { void mnodeUpdateClusterId() {
SClusterObj *pCluster = NULL; SClusterObj *pCluster = NULL;
mnodeGetNextCluster(NULL, &pCluster); void *pIter = mnodeGetNextCluster(NULL, &pCluster);
if (pCluster != NULL) { if (pCluster != NULL) {
tsClusterId = pCluster->clusterId; tsClusterId = pCluster->clusterId;
mnodeDecClusterRef(pCluster);
mInfo("cluster id is %d", tsClusterId); mInfo("cluster id is %d", tsClusterId);
} else {
//assert(false);
} }
}
mnodeDecClusterRef(pCluster);
sdbFreeIter(pIter);
}
static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mnodeGetClusterMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
......
...@@ -324,8 +324,10 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) { ...@@ -324,8 +324,10 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
if (pDb != NULL) { if (code == TSDB_CODE_SUCCESS) {
mLInfo("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg)); mLInfo("db:%s, is created by %s", pDb->name, mnodeGetUserFromMsg(pMsg));
} else {
mError("db:%s, failed to create by %s, reason:%s", pDb->name, mnodeGetUserFromMsg(pMsg), tstrerror(code));
} }
return code; return code;
...@@ -386,17 +388,16 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs ...@@ -386,17 +388,16 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCMCreateDbMsg *pCreate, void *pMs
.pObj = pDb, .pObj = pDb,
.rowSize = sizeof(SDbObj), .rowSize = sizeof(SDbObj),
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeCreateDbCb .writeCb = mnodeCreateDbCb
}; };
code = sdbInsertRow(&oper); code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mLInfo("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code)); mError("db:%s, failed to create, reason:%s", pDb->name, tstrerror(code));
mnodeDestroyDb(pDb); mnodeDestroyDb(pDb);
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return code;
} }
bool mnodeCheckIsMonitorDB(char *db, char *monitordb) { bool mnodeCheckIsMonitorDB(char *db, char *monitordb) {
...@@ -754,8 +755,8 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) { ...@@ -754,8 +755,8 @@ static int32_t mnodeSetDbDropping(SDbObj *pDb) {
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
return TSDB_CODE_MND_SDB_ERROR; mError("db:%s, failed to set dropping state, reason:%s", pDb->name, tstrerror(code));
} }
return code; return code;
...@@ -947,12 +948,12 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) { ...@@ -947,12 +948,12 @@ static int32_t mnodeAlterDb(SDbObj *pDb, SCMAlterDbMsg *pAlter, void *pMsg) {
.table = tsDbSdb, .table = tsDbSdb,
.pObj = pDb, .pObj = pDb,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeAlterDbCb .writeCb = mnodeAlterDbCb
}; };
code = sdbUpdateRow(&oper); code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; mError("db:%s, failed to alter, reason:%s", pDb->name, tstrerror(code));
} }
} }
...@@ -995,16 +996,16 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) { ...@@ -995,16 +996,16 @@ static int32_t mnodeDropDb(SMnodeMsg *pMsg) {
mInfo("db:%s, drop db from sdb", pDb->name); mInfo("db:%s, drop db from sdb", pDb->name);
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsDbSdb, .table = tsDbSdb,
.pObj = pDb, .pObj = pDb,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeDropDbCb .writeCb = mnodeDropDbCb
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS; mError("db:%s, failed to drop, reason:%s", pDb->name, tstrerror(code));
} }
return code; return code;
...@@ -1031,7 +1032,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) { ...@@ -1031,7 +1032,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
} }
int32_t code = mnodeSetDbDropping(pMsg->pDb); int32_t code = mnodeSetDbDropping(pMsg->pDb);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code)); mError("db:%s, failed to drop, reason:%s", pDrop->db, tstrerror(code));
return code; return code;
} }
......
...@@ -261,7 +261,8 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) { ...@@ -261,7 +261,8 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) {
.pObj = pDnode .pObj = pDnode
}; };
if (sdbUpdateRow(&oper) != 0) { int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("dnodeId:%d, failed update", pDnode->dnodeId); mError("dnodeId:%d, failed update", pDnode->dnodeId);
} }
} }
...@@ -501,13 +502,12 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) { ...@@ -501,13 +502,12 @@ static int32_t mnodeCreateDnode(char *ep, SMnodeMsg *pMsg) {
}; };
int32_t code = sdbInsertRow(&oper); int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
int dnodeId = pDnode->dnodeId; int dnodeId = pDnode->dnodeId;
tfree(pDnode); tfree(pDnode);
mError("failed to create dnode:%d, result:%s", dnodeId, tstrerror(code)); mError("failed to create dnode:%d, reason:%s", dnodeId, tstrerror(code));
} else { } else {
mInfo("dnode:%d is created, result:%s", pDnode->dnodeId, tstrerror(code)); mLInfo("dnode:%d is created", pDnode->dnodeId);
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return code; return code;
...@@ -522,9 +522,10 @@ int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) { ...@@ -522,9 +522,10 @@ int32_t mnodeDropDnode(SDnodeObj *pDnode, void *pMsg) {
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mLInfo("dnode:%d, is dropped from cluster, result:%s", pDnode->dnodeId, tstrerror(code)); mError("dnode:%d, failed to drop from cluster, result:%s", pDnode->dnodeId, tstrerror(code));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; } else {
mLInfo("dnode:%d, is dropped from cluster", pDnode->dnodeId);
} }
return code; return code;
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tglobal.h"
#include "trpc.h" #include "trpc.h"
#include "tsync.h" #include "tsync.h"
#include "tbalance.h" #include "tbalance.h"
...@@ -31,8 +32,6 @@ ...@@ -31,8 +32,6 @@
#include "mnodeShow.h" #include "mnodeShow.h"
#include "mnodeUser.h" #include "mnodeUser.h"
#include "tglobal.h"
static void * tsMnodeSdb = NULL; static void * tsMnodeSdb = NULL;
static int32_t tsMnodeUpdateSize = 0; static int32_t tsMnodeUpdateSize = 0;
static SRpcEpSet tsMnodeEpSetForShell; static SRpcEpSet tsMnodeEpSetForShell;
...@@ -279,9 +278,8 @@ int32_t mnodeAddMnode(int32_t dnodeId) { ...@@ -279,9 +278,8 @@ int32_t mnodeAddMnode(int32_t dnodeId) {
}; };
int32_t code = sdbInsertRow(&oper); int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
tfree(pMnode); tfree(pMnode);
code = TSDB_CODE_MND_SDB_ERROR;
} }
mnodeUpdateMnodeEpSet(); mnodeUpdateMnodeEpSet();
...@@ -313,9 +311,6 @@ int32_t mnodeDropMnode(int32_t dnodeId) { ...@@ -313,9 +311,6 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&oper);
if (code != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_SDB_ERROR;
}
sdbDecRef(tsMnodeSdb, pMnode); sdbDecRef(tsMnodeSdb, pMnode);
......
...@@ -101,6 +101,11 @@ static int32_t sdbInitWriteWorker(); ...@@ -101,6 +101,11 @@ static int32_t sdbInitWriteWorker();
static void sdbCleanupWriteWorker(); static void sdbCleanupWriteWorker();
static int32_t sdbAllocWriteQueue(); static int32_t sdbAllocWriteQueue();
static void sdbFreeWritequeue(); static void sdbFreeWritequeue();
static int32_t sdbUpdateRowImp(SSdbOper *pOper);
static int32_t sdbDeleteRowImp(SSdbOper *pOper);
static int32_t sdbInsertHash(SSdbTable *pTable, SSdbOper *pOper);
static int32_t sdbUpdateHash(SSdbTable *pTable, SSdbOper *pOper);
static int32_t sdbDeleteHash(SSdbTable *pTable, SSdbOper *pOper);
int32_t sdbGetId(void *handle) { int32_t sdbGetId(void *handle) {
return ((SSdbTable *)handle)->autoIndex; return ((SSdbTable *)handle)->autoIndex;
...@@ -260,8 +265,20 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { ...@@ -260,8 +265,20 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
tstrerror(code)); tstrerror(code));
} }
if (pOper->cb != NULL) { // failed to forward, need revert insert
pOper->retCode = (*pOper->cb)(pMsg, pOper->retCode); if (pOper->retCode != TSDB_CODE_SUCCESS) {
SWalHead *pHead = (void *)pOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
int32_t action = pHead->msgType % 10;
sdbError("table:%s record:%p:%s ver:%" PRIu64 ", action:%d failed to foward reason:%s",
((SSdbTable *)pOper->table)->tableName, pOper->pObj, sdbGetKeyStr(pOper->table, pHead->cont),
pHead->version, action, tstrerror(pOper->retCode));
if (action == SDB_ACTION_INSERT) {
sdbDeleteHash(pOper->table, pOper);
}
}
if (pOper->writeCb != NULL) {
pOper->retCode = (*pOper->writeCb)(pMsg, pOper->retCode);
} }
dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode); dnodeSendRpcMnodeWriteRsp(pMsg, pOper->retCode);
...@@ -269,6 +286,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) { ...@@ -269,6 +286,7 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
if (ahandle == NULL) { if (ahandle == NULL) {
sdbDecRef(pOper->table, pOper->pObj); sdbDecRef(pOper->table, pOper->pObj);
} }
taosFreeQitem(pOper); taosFreeQitem(pOper);
} }
...@@ -609,7 +627,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) { ...@@ -609,7 +627,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE; if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
if (sdbGetRowFromObj(pTable, pOper->pObj)) { if (sdbGetRowFromObj(pTable, pOper->pObj)) {
sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName, sdbGetKeyStrFromObj(pTable, pOper->pObj)); sdbError("table:%s, failed to insert record:%s, already exist", pTable->tableName,
sdbGetKeyStrFromObj(pTable, pOper->pObj));
sdbDecRef(pTable, pOper->pObj); sdbDecRef(pTable, pOper->pObj);
return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE; return TSDB_CODE_MND_SDB_OBJ_ALREADY_THERE;
} }
...@@ -634,9 +653,20 @@ int32_t sdbInsertRow(SSdbOper *pOper) { ...@@ -634,9 +653,20 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pOper->reqFp) {
return (*pOper->reqFp)(pOper->pMsg);
} else {
return sdbInsertRowImp(pOper);
}
}
int32_t sdbInsertRowImp(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSdbOper *pNewOper = taosAllocateQitem(size); SSdbOper *pNewOper = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
pHead->version = 0; pHead->version = 0;
pHead->len = pOper->rowSize; pHead->len = pOper->rowSize;
...@@ -655,7 +685,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) { ...@@ -655,7 +685,8 @@ int32_t sdbInsertRow(SSdbOper *pOper) {
sdbIncRef(pNewOper->table, pNewOper->pObj); sdbIncRef(pNewOper->table, pNewOper->pObj);
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
bool sdbCheckRowDeleted(void *pTableInput, void *pRow) { bool sdbCheckRowDeleted(void *pTableInput, void *pRow) {
...@@ -664,7 +695,6 @@ bool sdbCheckRowDeleted(void *pTableInput, void *pRow) { ...@@ -664,7 +695,6 @@ bool sdbCheckRowDeleted(void *pTableInput, void *pRow) {
int32_t *updateEnd = pRow + pTable->refCountPos - 4; int32_t *updateEnd = pRow + pTable->refCountPos - 4;
return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1; return atomic_val_compare_exchange_32(updateEnd, 1, 1) == 1;
// return (*updateEnd == 1);
} }
int32_t sdbDeleteRow(SSdbOper *pOper) { int32_t sdbDeleteRow(SSdbOper *pOper) {
...@@ -692,13 +722,24 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -692,13 +722,24 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pOper->reqFp) {
return (*pOper->reqFp)(pOper->pMsg);
} else {
return sdbDeleteRowImp(pOper);
}
}
int32_t sdbDeleteRowImp(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSdbOper *pNewOper = taosAllocateQitem(size); SSdbOper *pNewOper = taosAllocateQitem(size);
SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK; SWalHead *pHead = (void *)pNewOper + sizeof(SSdbOper) + SDB_SYNC_HACK;
pHead->version = 0; pHead->version = 0;
pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE; pHead->msgType = pTable->tableId * 10 + SDB_ACTION_DELETE;
pOper->rowData = pHead->cont; pOper->rowData = pHead->cont;
(*pTable->encodeFp)(pOper); (*pTable->encodeFp)(pOper);
pHead->len = pOper->rowSize; pHead->len = pOper->rowSize;
...@@ -711,7 +752,8 @@ int32_t sdbDeleteRow(SSdbOper *pOper) { ...@@ -711,7 +752,8 @@ int32_t sdbDeleteRow(SSdbOper *pOper) {
} }
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
int32_t sdbUpdateRow(SSdbOper *pOper) { int32_t sdbUpdateRow(SSdbOper *pOper) {
...@@ -735,6 +777,17 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { ...@@ -735,6 +777,17 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (pOper->reqFp) {
return (*pOper->reqFp)(pOper->pMsg);
} else {
return sdbUpdateRowImp(pOper);
}
}
int32_t sdbUpdateRowImp(SSdbOper *pOper) {
SSdbTable *pTable = (SSdbTable *)pOper->table;
if (pTable == NULL) return TSDB_CODE_MND_SDB_INVALID_TABLE_TYPE;
int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK; int32_t size = sizeof(SSdbOper) + sizeof(SWalHead) + pTable->maxRowSize + SDB_SYNC_HACK;
SSdbOper *pNewOper = taosAllocateQitem(size); SSdbOper *pNewOper = taosAllocateQitem(size);
...@@ -755,7 +808,8 @@ int32_t sdbUpdateRow(SSdbOper *pOper) { ...@@ -755,7 +808,8 @@ int32_t sdbUpdateRow(SSdbOper *pOper) {
sdbIncRef(pNewOper->table, pNewOper->pObj); sdbIncRef(pNewOper->table, pNewOper->pObj);
taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper); taosWriteQitem(tsSdbWriteQueue, TAOS_QTYPE_RPC, pNewOper);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
void *sdbFetchRow(void *handle, void *pNode, void **ppRow) { void *sdbFetchRow(void *handle, void *pNode, void **ppRow) {
......
...@@ -865,18 +865,17 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -865,18 +865,17 @@ static int32_t mnodeProcessCreateSuperTableMsg(SMnodeMsg *pMsg) {
.pObj = pStable, .pObj = pStable,
.rowSize = sizeof(SSuperTableObj) + schemaSize, .rowSize = sizeof(SSuperTableObj) + schemaSize,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeCreateSuperTableCb .writeCb = mnodeCreateSuperTableCb
}; };
int32_t code = sdbInsertRow(&oper); int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroySuperTable(pStable); mnodeDestroySuperTable(pStable);
pMsg->pTable = NULL; pMsg->pTable = NULL;
mError("app:%p:%p, table:%s, failed to create, sdb error", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); mError("app:%p:%p, table:%s, failed to create, sdb error", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return code;
} }
static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeDropSuperTableCb(SMnodeMsg *pMsg, int32_t code) {
...@@ -924,13 +923,15 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) { ...@@ -924,13 +923,15 @@ static int32_t mnodeProcessDropSuperTableMsg(SMnodeMsg *pMsg) {
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeDropSuperTableCb .writeCb = mnodeDropSuperTableCb
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; mError("app:%p:%p, table:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pStable->info.tableId,
tstrerror(code));
} }
return code; return code;
} }
...@@ -995,15 +996,10 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t ...@@ -995,15 +996,10 @@ static int32_t mnodeAddSuperTableTag(SMnodeMsg *pMsg, SSchema schema[], int32_t
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeAddSuperTableTagCb .writeCb = mnodeAddSuperTableTagCb
}; };
int32_t code = sdbUpdateRow(&oper); return sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
} }
static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeDropSuperTableTagCb(SMnodeMsg *pMsg, int32_t code) {
...@@ -1034,15 +1030,10 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) { ...@@ -1034,15 +1030,10 @@ static int32_t mnodeDropSuperTableTag(SMnodeMsg *pMsg, char *tagName) {
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeDropSuperTableTagCb .writeCb = mnodeDropSuperTableTagCb
}; };
int32_t code = sdbUpdateRow(&oper); return sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
} }
static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeModifySuperTableTagNameCb(SMnodeMsg *pMsg, int32_t code) {
...@@ -1083,15 +1074,10 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c ...@@ -1083,15 +1074,10 @@ static int32_t mnodeModifySuperTableTagName(SMnodeMsg *pMsg, char *oldTagName, c
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeModifySuperTableTagNameCb .writeCb = mnodeModifySuperTableTagNameCb
}; };
int32_t code = sdbUpdateRow(&oper); return sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
} }
static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) { static int32_t mnodeFindSuperTableColumnIndex(SSuperTableObj *pStable, char *colName) {
...@@ -1162,15 +1148,10 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32 ...@@ -1162,15 +1148,10 @@ static int32_t mnodeAddSuperTableColumn(SMnodeMsg *pMsg, SSchema schema[], int32
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeAddSuperTableColumnCb .writeCb = mnodeAddSuperTableColumnCb
}; };
int32_t code = sdbUpdateRow(&oper); return sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
} }
static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeDropSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
...@@ -1212,15 +1193,10 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) { ...@@ -1212,15 +1193,10 @@ static int32_t mnodeDropSuperTableColumn(SMnodeMsg *pMsg, char *colName) {
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeDropSuperTableColumnCb .writeCb = mnodeDropSuperTableColumnCb
}; };
int32_t code = sdbUpdateRow(&oper); return sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
} }
static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeChangeSuperTableColumnCb(SMnodeMsg *pMsg, int32_t code) {
...@@ -1261,15 +1237,10 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char ...@@ -1261,15 +1237,10 @@ static int32_t mnodeChangeSuperTableColumn(SMnodeMsg *pMsg, char *oldName, char
.table = tsSuperTableSdb, .table = tsSuperTableSdb,
.pObj = pStable, .pObj = pStable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeChangeSuperTableColumnCb .writeCb = mnodeChangeSuperTableColumnCb
}; };
int32_t code = sdbUpdateRow(&oper); return sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
} }
// show super tables // show super tables
...@@ -1645,20 +1616,12 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO ...@@ -1645,20 +1616,12 @@ static void *mnodeBuildCreateChildTableMsg(SCMCreateTableMsg *pMsg, SChildTableO
return pCreate; return pCreate;
} }
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeDoCreateChildTableFp(SMnodeMsg *pMsg) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable; SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
assert(pTable); assert(pTable);
if (code == TSDB_CODE_SUCCESS) { mDebug("app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%" PRIu64, pMsg->rpcMsg.ahandle, pMsg,
mDebug("app:%p:%p, table:%s, created in mnode, vgId:%d sid:%d, uid:%" PRIu64 ", result:%s", pMsg->rpcMsg.ahandle, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid);
pMsg, pTable->info.tableId, pTable->vgId, pTable->sid, pTable->uid, tstrerror(code));
} else {
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
return code;
}
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable); SMDCreateTableMsg *pMDCreate = mnodeBuildCreateChildTableMsg(pCreate, pTable);
...@@ -1679,6 +1642,34 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -1679,6 +1642,34 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
SChildTableObj *pTable = (SChildTableObj *)pMsg->pTable;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
assert(pTable);
if (code == TSDB_CODE_SUCCESS) {
if (pCreate->getMeta) {
mDebug("app:%p:%p, table:%s, created in dnode and continue to get meta, thandle:%p", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pMsg->rpcMsg.handle);
pMsg->retry = 0;
dnodeReprocessMnodeWriteMsg(pMsg);
} else {
mDebug("app:%p:%p, table:%s, created in dnode, thandle:%p", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
pMsg->rpcMsg.handle);
dnodeSendRpcMnodeWriteRsp(pMsg, TSDB_CODE_SUCCESS);
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} else {
mError("app:%p:%p, table:%s, failed to create table sid:%d, uid:%" PRIu64 ", reason:%s", pMsg->rpcMsg.ahandle, pMsg,
pTable->info.tableId, pTable->sid, pTable->uid, tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pTable, .table = tsChildTableSdb};
sdbDeleteRow(&desc);
return code;
}
}
static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
SVgObj *pVgroup = pMsg->pVgroup; SVgObj *pVgroup = pMsg->pVgroup;
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont; SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
...@@ -1752,23 +1743,23 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) { ...@@ -1752,23 +1743,23 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
pMsg->pTable = (STableObj *)pTable; pMsg->pTable = (STableObj *)pTable;
mnodeIncTableRef(pMsg->pTable); mnodeIncTableRef(pMsg->pTable);
SSdbOper desc = {0}; SSdbOper desc = {
desc.type = SDB_OPER_GLOBAL; .type = SDB_OPER_GLOBAL,
desc.pObj = pTable; .pObj = pTable,
desc.table = tsChildTableSdb; .table = tsChildTableSdb,
desc.pMsg = pMsg; .pMsg = pMsg,
desc.cb = mnodeDoCreateChildTableCb; .reqFp = mnodeDoCreateChildTableFp
};
int32_t code = sdbInsertRow(&desc); int32_t code = sdbInsertRow(&desc);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeDestroyChildTable(pTable); mnodeDestroyChildTable(pTable);
pMsg->pTable = NULL; pMsg->pTable = NULL;
mError("app:%p:%p, table:%s, update sdb error, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId, mError("app:%p:%p, table:%s, failed to create, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId,
tstrerror(code)); tstrerror(code));
return code;
} else {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return code;
} }
static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
...@@ -1813,7 +1804,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) { ...@@ -1813,7 +1804,7 @@ static int32_t mnodeProcessCreateChildTableMsg(SMnodeMsg *pMsg) {
return terrno; return terrno;
} else { } else {
mDebug("app:%p:%p, table:%s, send create msg to vnode again", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId); mDebug("app:%p:%p, table:%s, send create msg to vnode again", pMsg->rpcMsg.ahandle, pMsg, pCreate->tableId);
return mnodeDoCreateChildTableCb(pMsg, TSDB_CODE_SUCCESS); return mnodeDoCreateChildTableFp(pMsg);
} }
} }
...@@ -1878,13 +1869,15 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) { ...@@ -1878,13 +1869,15 @@ static int32_t mnodeProcessDropChildTableMsg(SMnodeMsg *pMsg) {
.table = tsChildTableSdb, .table = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeDropChildTableCb .writeCb = mnodeDropChildTableCb
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; mError("app:%p:%p, ctable:%s, failed to drop, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pTable->info.tableId,
tstrerror(code));
} }
return code; return code;
} }
...@@ -1980,15 +1973,10 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3 ...@@ -1980,15 +1973,10 @@ static int32_t mnodeAddNormalTableColumn(SMnodeMsg *pMsg, SSchema schema[], int3
.table = tsChildTableSdb, .table = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeAlterNormalTableColumnCb .writeCb = mnodeAlterNormalTableColumnCb
}; };
int32_t code = sdbUpdateRow(&oper); return sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
} }
static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
...@@ -2018,15 +2006,10 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) { ...@@ -2018,15 +2006,10 @@ static int32_t mnodeDropNormalTableColumn(SMnodeMsg *pMsg, char *colName) {
.table = tsChildTableSdb, .table = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeAlterNormalTableColumnCb .writeCb = mnodeAlterNormalTableColumnCb
}; };
int32_t code = sdbUpdateRow(&oper); return sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
} }
static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) { static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char *newName) {
...@@ -2060,15 +2043,10 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char ...@@ -2060,15 +2043,10 @@ static int32_t mnodeChangeNormalTableColumn(SMnodeMsg *pMsg, char *oldName, char
.table = tsChildTableSdb, .table = tsChildTableSdb,
.pObj = pTable, .pObj = pTable,
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeAlterNormalTableColumnCb .writeCb = mnodeAlterNormalTableColumnCb
}; };
int32_t code = sdbUpdateRow(&oper); return sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
return code;
} }
static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pTable) { static int32_t mnodeSetSchemaFromNormalTable(SSchema *pSchema, SChildTableObj *pTable) {
...@@ -2374,19 +2352,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { ...@@ -2374,19 +2352,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
} }
if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) { if (rpcMsg->code == TSDB_CODE_SUCCESS || rpcMsg->code == TSDB_CODE_TDB_TABLE_ALREADY_EXIST) {
SCMCreateTableMsg *pCreate = mnodeMsg->rpcMsg.pCont; SSdbOper desc = {
if (pCreate->getMeta) { .type = SDB_OPER_GLOBAL,
mDebug("app:%p:%p, table:%s, created in dnode and continue to get meta, thandle:%p result:%s", .pObj = pTable,
mnodeMsg->rpcMsg.ahandle, mnodeMsg, pTable->info.tableId, mnodeMsg->rpcMsg.handle, .table = tsChildTableSdb,
tstrerror(rpcMsg->code)); .pMsg = mnodeMsg,
.writeCb = mnodeDoCreateChildTableCb
mnodeMsg->retry = 0; };
dnodeReprocessMnodeWriteMsg(mnodeMsg);
} else { int32_t code = sdbInsertRowImp(&desc);
mDebug("app:%p:%p, table:%s, created in dnode, thandle:%p result:%s", mnodeMsg->rpcMsg.ahandle, mnodeMsg, if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
pTable->info.tableId, mnodeMsg->rpcMsg.handle, tstrerror(rpcMsg->code)); mnodeMsg->pTable = NULL;
mnodeDestroyChildTable(pTable);
dnodeSendRpcMnodeWriteRsp(mnodeMsg, TSDB_CODE_SUCCESS); dnodeSendRpcMnodeWriteRsp(mnodeMsg, code);
} }
} else { } else {
if (mnodeMsg->retry++ < 10) { if (mnodeMsg->retry++ < 10) {
......
...@@ -182,9 +182,10 @@ static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) { ...@@ -182,9 +182,10 @@ static int32_t mnodeUpdateUser(SUserObj *pUser, void *pMsg) {
}; };
int32_t code = sdbUpdateRow(&oper); int32_t code = sdbUpdateRow(&oper);
if (code == TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to alter by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
} else {
mLInfo("user:%s, is altered by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); mLInfo("user:%s, is altered by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return code; return code;
...@@ -236,11 +237,11 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) { ...@@ -236,11 +237,11 @@ int32_t mnodeCreateUser(SAcctObj *pAcct, char *name, char *pass, void *pMsg) {
}; };
code = sdbInsertRow(&oper); code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to create by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
tfree(pUser); tfree(pUser);
} else { } else {
mLInfo("user:%s, is created by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); mLInfo("user:%s, is created by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return code; return code;
...@@ -255,9 +256,10 @@ static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) { ...@@ -255,9 +256,10 @@ static int32_t mnodeDropUser(SUserObj *pUser, void *pMsg) {
}; };
int32_t code = sdbDeleteRow(&oper); int32_t code = sdbDeleteRow(&oper);
if (code == TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("user:%s, failed to drop by %s, reason:%s", pUser->user, mnodeGetUserFromMsg(pMsg), tstrerror(code));
} else {
mLInfo("user:%s, is dropped by %s", pUser->user, mnodeGetUserFromMsg(pMsg)); mLInfo("user:%s, is dropped by %s", pUser->user, mnodeGetUserFromMsg(pMsg));
if (pMsg != NULL) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return code; return code;
......
...@@ -256,7 +256,8 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) { ...@@ -256,7 +256,8 @@ void mnodeUpdateVgroup(SVgObj *pVgroup) {
.pObj = pVgroup .pObj = pVgroup
}; };
if (sdbUpdateRow(&oper) != TSDB_CODE_SUCCESS) { int32_t code = sdbUpdateRow(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("vgId:%d, failed to update vgroup", pVgroup->vgId); mError("vgId:%d, failed to update vgroup", pVgroup->vgId);
} }
mnodeSendAlterVgroupMsg(pVgroup); mnodeSendAlterVgroupMsg(pVgroup);
...@@ -476,23 +477,11 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) { ...@@ -476,23 +477,11 @@ void *mnodeGetNextVgroup(void *pIter, SVgObj **pVgroup) {
return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup); return sdbFetchRow(tsVgroupSdb, pIter, (void **)pVgroup);
} }
static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { static int32_t mnodeCreateVgroupFp(SMnodeMsg *pMsg) {
SVgObj *pVgroup = pMsg->pVgroup; SVgObj *pVgroup = pMsg->pVgroup;
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
assert(pVgroup); assert(pVgroup);
if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
sdbDeleteRow(&desc);
return code;
} else {
pVgroup->status = TAOS_VG_STATUS_READY;
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
(void)sdbUpdateRow(&desc);
}
mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId, mInfo("app:%p:%p, vgId:%d, is created in mnode, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
pDb->name, pVgroup->numOfVnodes); pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
...@@ -508,6 +497,29 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) { ...@@ -508,6 +497,29 @@ static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
return TSDB_CODE_MND_ACTION_IN_PROGRESS; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mnodeCreateVgroupCb(SMnodeMsg *pMsg, int32_t code) {
SVgObj *pVgroup = pMsg->pVgroup;
SDbObj *pDb = pMsg->pDb;
assert(pVgroup);
if (code != TSDB_CODE_SUCCESS) {
mError("app:%p:%p, vgId:%d, failed to create in sdb, reason:%s", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
tstrerror(code));
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
sdbDeleteRow(&desc);
return code;
} else {
mInfo("app:%p:%p, vgId:%d, is created in sdb, db:%s replica:%d", pMsg->rpcMsg.ahandle, pMsg, pVgroup->vgId,
pDb->name, pVgroup->numOfVnodes);
pVgroup->status = TAOS_VG_STATUS_READY;
SSdbOper desc = {.type = SDB_OPER_GLOBAL, .pObj = pVgroup, .table = tsVgroupSdb};
(void)sdbUpdateRow(&desc);
dnodeReprocessMnodeWriteMsg(pMsg);
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
}
int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) { int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR; if (pMsg == NULL) return TSDB_CODE_MND_APP_ERROR;
SDbObj *pDb = pMsg->pDb; SDbObj *pDb = pMsg->pDb;
...@@ -527,20 +539,18 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) { ...@@ -527,20 +539,18 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg) {
mnodeIncVgroupRef(pVgroup); mnodeIncVgroupRef(pVgroup);
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb, .table = tsVgroupSdb,
.pObj = pVgroup, .pObj = pVgroup,
.rowSize = sizeof(SVgObj), .rowSize = sizeof(SVgObj),
.pMsg = pMsg, .pMsg = pMsg,
.cb = mnodeCreateVgroupCb .reqFp = mnodeCreateVgroupFp
}; };
int32_t code = sdbInsertRow(&oper); int32_t code = sdbInsertRow(&oper);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
pMsg->pVgroup = NULL; pMsg->pVgroup = NULL;
mnodeDestroyVgroup(pVgroup); mnodeDestroyVgroup(pVgroup);
} else {
code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
return code; return code;
...@@ -891,7 +901,21 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) { ...@@ -891,7 +901,21 @@ static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (mnodeMsg->received != mnodeMsg->expected) return; if (mnodeMsg->received != mnodeMsg->expected) return;
if (mnodeMsg->received == mnodeMsg->successed) { if (mnodeMsg->received == mnodeMsg->successed) {
dnodeReprocessMnodeWriteMsg(mnodeMsg); SSdbOper oper = {
.type = SDB_OPER_GLOBAL,
.table = tsVgroupSdb,
.pObj = pVgroup,
.rowSize = sizeof(SVgObj),
.pMsg = mnodeMsg,
.writeCb = mnodeCreateVgroupCb
};
int32_t code = sdbInsertRowImp(&oper);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mnodeMsg->pVgroup = NULL;
mnodeDestroyVgroup(pVgroup);
dnodeSendRpcMnodeWriteRsp(mnodeMsg, code);
}
} else { } else {
SSdbOper oper = { SSdbOper oper = {
.type = SDB_OPER_GLOBAL, .type = SDB_OPER_GLOBAL,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册