未验证 提交 04bb5ca6 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #15103 from taosdata/fix/mnode

enh: If an 'object is creating' error is reported when creating a database, wait for the transaction to end then response
......@@ -35,6 +35,46 @@
extern "C" {
#endif
typedef enum {
MND_OPER_CONNECT = 1,
MND_OPER_CREATE_ACCT,
MND_OPER_DROP_ACCT,
MND_OPER_ALTER_ACCT,
MND_OPER_CREATE_USER,
MND_OPER_DROP_USER,
MND_OPER_ALTER_USER,
MND_OPER_CREATE_BNODE,
MND_OPER_DROP_BNODE,
MND_OPER_CREATE_DNODE,
MND_OPER_DROP_DNODE,
MND_OPER_CONFIG_DNODE,
MND_OPER_CREATE_MNODE,
MND_OPER_DROP_MNODE,
MND_OPER_CREATE_QNODE,
MND_OPER_DROP_QNODE,
MND_OPER_CREATE_SNODE,
MND_OPER_DROP_SNODE,
MND_OPER_REDISTRIBUTE_VGROUP,
MND_OPER_MERGE_VGROUP,
MND_OPER_SPLIT_VGROUP,
MND_OPER_BALANCE_VGROUP,
MND_OPER_CREATE_FUNC,
MND_OPER_DROP_FUNC,
MND_OPER_KILL_TRANS,
MND_OPER_KILL_CONN,
MND_OPER_KILL_QUERY,
MND_OPER_CREATE_DB,
MND_OPER_ALTER_DB,
MND_OPER_DROP_DB,
MND_OPER_COMPACT_DB,
MND_OPER_TRIM_DB,
MND_OPER_USE_DB,
MND_OPER_WRITE_DB,
MND_OPER_READ_DB,
MND_OPER_READ_OR_WRITE_DB,
MND_OPER_SHOW_VARIBALES,
} EOperType;
typedef enum {
MND_AUTH_ACCT_START = 0,
MND_AUTH_ACCT_USER,
......@@ -109,9 +149,9 @@ typedef struct {
ETrnPolicy policy;
ETrnConflct conflict;
ETrnExec exec;
EOperType oper;
int32_t code;
int32_t failedTimes;
SRpcHandleInfo rpcInfo;
void* rpcRsp;
int32_t rpcRspLen;
int32_t redoActionPos;
......@@ -130,6 +170,7 @@ typedef struct {
int32_t stopFunc;
int32_t paramLen;
void* param;
SArray* pRpcArray;
} STrans;
typedef struct {
......
......@@ -22,46 +22,6 @@
extern "C" {
#endif
typedef enum {
MND_OPER_CONNECT = 1,
MND_OPER_CREATE_ACCT,
MND_OPER_DROP_ACCT,
MND_OPER_ALTER_ACCT,
MND_OPER_CREATE_USER,
MND_OPER_DROP_USER,
MND_OPER_ALTER_USER,
MND_OPER_CREATE_BNODE,
MND_OPER_DROP_BNODE,
MND_OPER_CREATE_DNODE,
MND_OPER_DROP_DNODE,
MND_OPER_CONFIG_DNODE,
MND_OPER_CREATE_MNODE,
MND_OPER_DROP_MNODE,
MND_OPER_CREATE_QNODE,
MND_OPER_DROP_QNODE,
MND_OPER_CREATE_SNODE,
MND_OPER_DROP_SNODE,
MND_OPER_REDISTRIBUTE_VGROUP,
MND_OPER_MERGE_VGROUP,
MND_OPER_SPLIT_VGROUP,
MND_OPER_BALANCE_VGROUP,
MND_OPER_CREATE_FUNC,
MND_OPER_DROP_FUNC,
MND_OPER_KILL_TRANS,
MND_OPER_KILL_CONN,
MND_OPER_KILL_QUERY,
MND_OPER_CREATE_DB,
MND_OPER_ALTER_DB,
MND_OPER_DROP_DB,
MND_OPER_COMPACT_DB,
MND_OPER_TRIM_DB,
MND_OPER_USE_DB,
MND_OPER_WRITE_DB,
MND_OPER_READ_DB,
MND_OPER_READ_OR_WRITE_DB,
MND_OPER_SHOW_VARIBALES,
} EOperType;
int32_t mndInitPrivilege(SMnode *pMnode);
void mndCleanupPrivilege(SMnode *pMnode);
......
......@@ -73,12 +73,14 @@ void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2);
void mndTransSetSerial(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
int32_t mndTransProcessRsp(SRpcMsg *pRsp);
void mndTransPullup(SMnode *pMnode);
int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans);
void mndTransExecute(SMnode *pMnode, STrans *pTrans);
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname);
#ifdef __cplusplus
}
......
......@@ -487,6 +487,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
mndTransSetDbName(pTrans, dbObj.name, NULL);
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
......@@ -534,6 +535,14 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
terrno = TSDB_CODE_MND_DB_ALREADY_EXIST;
goto _OVER;
}
} else if (terrno == TSDB_CODE_SDB_OBJ_CREATING) {
if (mndSetRpcInfoForDbTrans(pMnode, pReq, MND_OPER_CREATE_DB, createReq.db) == 0) {
mDebug("db:%s, is creating and response after trans finished", createReq.db);
code = TSDB_CODE_ACTION_IN_PROGRESS;
goto _OVER;
} else {
goto _OVER;
}
} else if (terrno != TSDB_CODE_MND_DB_NOT_EXIST) {
goto _OVER;
}
......
......@@ -122,6 +122,10 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, pTrans->policy, _OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->conflict, _OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->oper, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
......@@ -269,15 +273,22 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int8_t policy = 0;
int8_t conflict = 0;
int8_t exec = 0;
int8_t oper = 0;
int8_t reserved = 0;
int8_t actionType = 0;
SDB_GET_INT8(pRaw, dataPos, &stage, _OVER)
SDB_GET_INT8(pRaw, dataPos, &policy, _OVER)
SDB_GET_INT8(pRaw, dataPos, &conflict, _OVER)
SDB_GET_INT8(pRaw, dataPos, &exec, _OVER)
SDB_GET_INT8(pRaw, dataPos, &oper, _OVER)
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
pTrans->stage = stage;
pTrans->policy = policy;
pTrans->conflict = conflict;
pTrans->exec = exec;
pTrans->oper = oper;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
......@@ -495,6 +506,10 @@ static void mndTransDropData(STrans *pTrans) {
mndTransDropActions(pTrans->commitActions);
pTrans->commitActions = NULL;
}
if (pTrans->pRpcArray != NULL) {
taosArrayDestroy(pTrans->pRpcArray);
pTrans->pRpcArray = NULL;
}
if (pTrans->rpcRsp != NULL) {
taosMemoryFree(pTrans->rpcRsp);
pTrans->rpcRsp = NULL;
......@@ -585,14 +600,18 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL) {
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL ||
pTrans->pRpcArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to create transaction since %s", terrstr());
return NULL;
}
if (pReq != NULL) pTrans->rpcInfo = pReq->info;
if (pReq != NULL) {
taosArrayPush(pTrans->pRpcArray, &pReq->info);
}
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
return pTrans;
}
......@@ -677,6 +696,31 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *
pTrans->paramLen = paramLen;
}
int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, const char *dbname) {
STrans *pTrans = NULL;
void *pIter = NULL;
int32_t code = -1;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) break;
if (pTrans->oper == oper) {
if (strcasecmp(dbname, pTrans->dbname1) == 0) {
mDebug("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper);
if (taosArrayPush(pTrans->pRpcArray, &pMsg->info) != NULL) {
code = 0;
}
sdbRelease(pMnode->pSdb, pTrans);
break;
}
}
sdbRelease(pMnode->pSdb, pTrans);
}
return code;
}
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) {
if (dbname1 != NULL) {
memcpy(pTrans->dbname1, dbname1, TSDB_DB_FNAME_LEN);
......@@ -688,6 +732,8 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2)
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
if (pRaw == NULL) {
......@@ -711,7 +757,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
static bool mndCheckDbConflict(const char *db, STrans *pTrans) {
if (db[0] == 0) return false;
if (strcmp(db, pTrans->dbname1) == 0 || strcmp(db, pTrans->dbname2) == 0) return true;
if (strcasecmp(db, pTrans->dbname1) == 0 || strcasecmp(db, pTrans->dbname2) == 0) return true;
return false;
}
......@@ -784,9 +830,10 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1;
}
pNew->rpcInfo = pTrans->rpcInfo;
pNew->pRpcArray = pTrans->pRpcArray;
pNew->rpcRsp = pTrans->rpcRsp;
pNew->rpcRspLen = pTrans->rpcRspLen;
pTrans->pRpcArray = NULL;
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
......@@ -835,29 +882,34 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
}
if (sendRsp && pTrans->rpcInfo.handle != NULL) {
mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage),
pTrans->rpcInfo.ahandle);
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
}
SRpcMsg rspMsg = {.code = code, .info = pTrans->rpcInfo};
if (pTrans->rpcRspLen != 0) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
if (rpcCont != NULL) {
memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
rspMsg.pCont = rpcCont;
rspMsg.contLen = pTrans->rpcRspLen;
if (!sendRsp) return;
int32_t size = taosArrayGetSize(pTrans->pRpcArray);
if (size <= 0) return;
for (int32_t i = 0; i < size; ++i) {
SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i);
if (pInfo->handle != NULL) {
mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage),
pInfo->ahandle);
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL;
}
SRpcMsg rspMsg = {.code = code, .info = *pInfo};
if (pTrans->rpcRspLen != 0) {
void *rpcCont = rpcMallocCont(pTrans->rpcRspLen);
if (rpcCont != NULL) {
memcpy(rpcCont, pTrans->rpcRsp, pTrans->rpcRspLen);
rspMsg.pCont = rpcCont;
rspMsg.contLen = pTrans->rpcRspLen;
}
}
taosMemoryFree(pTrans->rpcRsp);
}
tmsgSendRsp(&rspMsg);
pTrans->rpcInfo.handle = NULL;
pTrans->rpcRsp = NULL;
pTrans->rpcRspLen = 0;
tmsgSendRsp(&rspMsg);
}
}
taosArrayClear(pTrans->pRpcArray);
}
int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
......
......@@ -248,8 +248,8 @@
./test.sh -f tsim/stream/sliding.sim
# ---- transaction
./test.sh -f tsim/trans/lossdata1.sim
./test.sh -f tsim/trans/create_db.sim
./test.sh -f tsim/trans/lossdata1.sim
./test.sh -f tsim/trans/create_db.sim
# ---- tmq
./test.sh -f tsim/tmq/basic1.sim
......
......@@ -152,7 +152,7 @@ endi
system_content sh/checkValgrind.sh -n dnode2
print cmd return result ----> [ $system_content ]
if $system_content > 4 then
if $system_content > 0 then
return -1
endi
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册