未验证 提交 a19873d7 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #12104 from taosdata/feature/dnode

refactor: adjust transaction code
......@@ -121,6 +121,10 @@ extern char tsCompressor[];
extern int32_t tsDiskCfgNum;
extern SDiskCfg tsDiskCfg[];
// internal
extern int32_t tsTransPullupMs;
extern int32_t tsMaRebalanceMs;
#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, const char *envFile,
......
......@@ -264,7 +264,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2)
#define TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4)
#define TSDB_CODE_MND_TRANS_CONFLICT TAOS_DEF_ERROR_CODE(0, 0x03D3)
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03D4)
// mnode-mq
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)
......
......@@ -169,6 +169,10 @@ uint32_t tsMaxRange = 500; // max range
uint32_t tsCurRange = 100; // range
char tsCompressor[32] = "ZSTD_COMPRESSOR"; // ZSTD_COMPRESSOR or GZIP_COMPRESSOR
// internal
int32_t tsTransPullupMs = 6000;
int32_t tsMaRebalanceMs = 2000;
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary) {
tstrncpy(tsDiskCfg[index].dir, v1, TSDB_FILENAME_LEN);
tsDiskCfg[index].level = level;
......
......@@ -15,37 +15,83 @@
#define _DEFAULT_SOURCE
#include "tmsgcb.h"
#include "taoserror.h"
static SMsgCb tsDefaultMsgCb;
void tmsgSetDefaultMsgCb(const SMsgCb* pMsgCb) { tsDefaultMsgCb = *pMsgCb; }
int32_t tmsgPutToQueue(const SMsgCb* pMsgCb, EQueueType qtype, SRpcMsg* pReq) {
return (*pMsgCb->queueFps[qtype])(pMsgCb->pWrapper, pReq);
PutToQueueFp fp = pMsgCb->queueFps[qtype];
if (fp != NULL) {
return (*fp)(pMsgCb->pWrapper, pReq);
} else {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
}
int32_t tmsgGetQueueSize(const SMsgCb* pMsgCb, int32_t vgId, EQueueType qtype) {
return (*pMsgCb->qsizeFp)(pMsgCb->pWrapper, vgId, qtype);
GetQueueSizeFp fp = pMsgCb->qsizeFp;
if (fp != NULL) {
return (*fp)(pMsgCb->pWrapper, vgId, qtype);
} else {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
}
int32_t tmsgSendReq(const SMsgCb* pMsgCb, const SEpSet* epSet, SRpcMsg* pReq) {
return (*pMsgCb->sendReqFp)(pMsgCb->pWrapper, epSet, pReq);
SendReqFp fp = pMsgCb->sendReqFp;
if (fp != NULL) {
return (*fp)(pMsgCb->pWrapper, epSet, pReq);
} else {
terrno = TSDB_CODE_INVALID_PTR;
return -1;
}
}
void tmsgSendRsp(const SRpcMsg* pRsp) { return (*tsDefaultMsgCb.sendRspFp)(tsDefaultMsgCb.pWrapper, pRsp); }
void tmsgSendRsp(const SRpcMsg* pRsp) {
SendRspFp fp = tsDefaultMsgCb.sendRspFp;
if (fp != NULL) {
return (*fp)(tsDefaultMsgCb.pWrapper, pRsp);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
}
void tmsgSendRedirectRsp(const SRpcMsg* pRsp, const SEpSet* pNewEpSet) {
return (*tsDefaultMsgCb.sendRedirectRspFp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
SendRedirectRspFp fp = tsDefaultMsgCb.sendRedirectRspFp;
if (fp != NULL) {
(*fp)(tsDefaultMsgCb.pWrapper, pRsp, pNewEpSet);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
}
void tmsgRegisterBrokenLinkArg(const SMsgCb* pMsgCb, SRpcMsg* pMsg) {
(*pMsgCb->registerBrokenLinkArgFp)(pMsgCb->pWrapper, pMsg);
RegisterBrokenLinkArgFp fp = pMsgCb->registerBrokenLinkArgFp;
if (fp != NULL) {
(*fp)(pMsgCb->pWrapper, pMsg);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
}
void tmsgReleaseHandle(void* handle, int8_t type) {
(*tsDefaultMsgCb.releaseHandleFp)(tsDefaultMsgCb.pWrapper, handle, type);
ReleaseHandleFp fp = tsDefaultMsgCb.releaseHandleFp;
if (fp != NULL) {
(*fp)(tsDefaultMsgCb.pWrapper, handle, type);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
}
void tmsgReportStartup(const char* name, const char* desc) {
(*tsDefaultMsgCb.reportStartupFp)(tsDefaultMsgCb.pWrapper, name, desc);
ReportStartup fp = tsDefaultMsgCb.reportStartupFp;
if (fp != NULL && tsDefaultMsgCb.pWrapper != NULL) {
(*fp)(tsDefaultMsgCb.pWrapper, name, desc);
} else {
terrno = TSDB_CODE_INVALID_PTR;
}
}
\ No newline at end of file
......@@ -710,12 +710,12 @@ static bool mndIsStbTrans(STrans *pTrans) {
return pTrans->type > TRN_TYPE_STB_SCOPE && pTrans->type < TRN_TYPE_STB_SCOPE_END;
}
static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) {
STrans *pTrans = NULL;
void *pIter = NULL;
bool canParallel = true;
bool conflict = false;
if (mndIsBasicTrans(pNewTrans)) return canParallel;
if (mndIsBasicTrans(pNewTrans)) return conflict;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
......@@ -724,7 +724,7 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
if (mndIsGlobalTrans(pNewTrans)) {
if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
canParallel = false;
conflict = true;
} else {
}
}
......@@ -732,11 +732,11 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
else if (mndIsDbTrans(pNewTrans)) {
if (mndIsGlobalTrans(pTrans)) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
canParallel = false;
conflict = true;
} else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
canParallel = false;
conflict = true;
}
} else {
}
......@@ -745,11 +745,11 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
else if (mndIsStbTrans(pNewTrans)) {
if (mndIsGlobalTrans(pTrans)) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
canParallel = false;
conflict = true;
} else if (mndIsDbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
canParallel = false;
conflict = true;
}
} else {
}
......@@ -760,12 +760,12 @@ static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pTrans);
return canParallel;
return conflict;
}
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
if (!mndCheckTransCanParallel(pMnode, pTrans)) {
terrno = TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL;
if (mndCheckTransConflict(pMnode, pTrans)) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;
}
......@@ -819,7 +819,8 @@ static int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) {
}
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
bool sendRsp = false;
bool sendRsp = false;
int32_t code = pTrans->code;
if (pTrans->stage == TRN_STAGE_FINISHED) {
sendRsp = true;
......@@ -828,12 +829,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION ||
pTrans->stage == TRN_STAGE_ROLLBACK) {
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
sendRsp = true;
}
}
if (pTrans->policy == TRN_POLICY_RETRY) {
} else {
if (pTrans->stage == TRN_STAGE_REDO_ACTION && pTrans->failedTimes > 0) {
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
sendRsp = true;
}
}
......@@ -845,13 +846,13 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
taosMemoryFree(pTrans->rpcRsp);
mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, code & 0xFFFF, pTrans->stage,
pTrans->rpcAHandle);
SRpcMsg rspMsg = {
.handle = pTrans->rpcHandle,
.ahandle = pTrans->rpcAHandle,
.refId = pTrans->rpcRefId,
.code = pTrans->code,
.code = code,
.pCont = rpcCont,
.contLen = pTrans->rpcRspLen,
};
......@@ -1099,8 +1100,8 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
} else {
pTrans->code = terrno;
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
pTrans->stage = TRN_STAGE_REDO_ACTION;
mError("trans:%d, stage from commit to redoAction since %s, failedTimes:%d", pTrans->id, terrstr(),
pTrans->stage = TRN_STAGE_UNDO_ACTION;
mError("trans:%d, stage from commit to undoAction since %s, failedTimes:%d", pTrans->id, terrstr(),
pTrans->failedTimes);
continueExec = true;
} else {
......@@ -1125,7 +1126,7 @@ static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) {
} else {
pTrans->code = terrno;
pTrans->failedTimes++;
mError("trans:%d, stage keep on commitLog since %s", pTrans->id, terrstr());
mError("trans:%d, stage keep on commitLog since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
......@@ -1161,7 +1162,7 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
continueExec = false;
} else {
pTrans->failedTimes++;
mError("trans:%d, stage keep on undoAction since %s", pTrans->id, terrstr());
mError("trans:%d, stage keep on undoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
......@@ -1178,7 +1179,7 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
continueExec = true;
} else {
pTrans->failedTimes++;
mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr());
mError("trans:%d, stage keep on rollback since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
......
......@@ -275,9 +275,6 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
char *param = strdup("====> test code to be deleted later <=====");
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
......
......@@ -43,9 +43,6 @@
#include "mndUser.h"
#include "mndVgroup.h"
#define MQ_TIMER_MS 2000
#define TRNAS_TIMER_MS 6000
static void *mndBuildTimerMsg(int32_t *pContLen) {
SMTimerReq timerReq = {0};
......@@ -68,7 +65,7 @@ static void mndPullupTrans(void *param, void *tmrId) {
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer);
taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer);
}
static void mndCalMqRebalance(void *param, void *tmrId) {
......@@ -84,7 +81,7 @@ static void mndCalMqRebalance(void *param, void *tmrId) {
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
}
taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer);
taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer);
}
static void mndPullupTelem(void *param, void *tmrId) {
......@@ -106,12 +103,12 @@ static int32_t mndInitTimer(SMnode *pMnode) {
return -1;
}
if (taosTmrReset(mndPullupTrans, TRNAS_TIMER_MS, pMnode, pMnode->timer, &pMnode->transTimer)) {
if (taosTmrReset(mndPullupTrans, tsTransPullupMs, pMnode, pMnode->timer, &pMnode->transTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (taosTmrReset(mndCalMqRebalance, MQ_TIMER_MS, pMnode, pMnode->timer, &pMnode->mqTimer)) {
if (taosTmrReset(mndCalMqRebalance, tsMaRebalanceMs, pMnode, pMnode->timer, &pMnode->mqTimer)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
......
......@@ -19,7 +19,7 @@ void reportStartup(SMgmtWrapper *pWrapper, const char *name, const char *desc) {
class MndTestTrans2 : public ::testing::Test {
protected:
static void SetUpTestSuite() {
static void InitLog() {
dDebugFlag = 143;
vDebugFlag = 0;
mDebugFlag = 207;
......@@ -42,9 +42,9 @@ class MndTestTrans2 : public ::testing::Test {
if (taosInitLog("taosdlog", 1) != 0) {
printf("failed to init log file\n");
}
}
walInit();
static void InitMnode() {
static SMsgCb msgCb = {0};
msgCb.reportStartupFp = reportStartup;
msgCb.pWrapper = (SMgmtWrapper *)(&msgCb); // hack
......@@ -58,12 +58,22 @@ class MndTestTrans2 : public ::testing::Test {
strcpy(opt.replicas[0].fqdn, "localhost");
opt.msgCb = msgCb;
tsTransPullupMs = 1000;
const char *mnodepath = "/tmp/mnode_test_trans";
taosRemoveDir(mnodepath);
pMnode = mndOpen(mnodepath, &opt);
mndStart(pMnode);
}
static void SetUpTestSuite() {
InitLog();
walInit();
InitMnode();
}
static void TearDownTestSuite() {
mndStop(pMnode);
mndClose(pMnode);
walCleanUp();
taosCloseLog();
......@@ -76,11 +86,11 @@ class MndTestTrans2 : public ::testing::Test {
void SetUp() override {}
void TearDown() override {}
void CreateUser(const char *user) {
int32_t CreateUser(const char *acct, const char *user) {
SUserObj userObj = {0};
taosEncryptPass_c((uint8_t *)"taosdata", strlen("taosdata"), userObj.pass);
tstrncpy(userObj.user, user, TSDB_USER_LEN);
tstrncpy(userObj.acct, "root", TSDB_USER_LEN);
tstrncpy(userObj.acct, acct, TSDB_USER_LEN);
userObj.createdTime = taosGetTimestampMs();
userObj.updateTime = userObj.createdTime;
userObj.superUser = 1;
......@@ -94,19 +104,34 @@ class MndTestTrans2 : public ::testing::Test {
char *param = strdup("====> test param <=====");
mndTransSetCb(pTrans, TEST_TRANS_START_FUNC, TEST_TRANS_STOP_FUNC, param, strlen(param) + 1);
mndTransPrepare(pMnode, pTrans);
int32_t code = mndTransPrepare(pMnode, pTrans);
mndTransDrop(pTrans);
return code;
}
};
SMnode *MndTestTrans2::pMnode;
TEST_F(MndTestTrans2, 01_CbFunc) {
ASSERT_NE(pMnode, nullptr);
const char *acct = "root";
const char *acct_invalid = "root1";
const char *user1 = "test1";
CreateUser(user1);
const char *user2 = "test2";
SUserObj *pUser1 = NULL;
SUserObj *pUser2 = NULL;
SUserObj *pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pMnode, nullptr);
// create user success
EXPECT_EQ(CreateUser(acct, user1), 0);
pUser1 = mndAcquireUser(pMnode, user1);
ASSERT_NE(pUser1, nullptr);
// failed to create user and rollback
EXPECT_EQ(CreateUser(acct_invalid, user2), 0);
pUser2 = mndAcquireUser(pMnode, user2);
ASSERT_EQ(pUser2, nullptr);
mndTransPullup(pMnode);
}
......@@ -271,7 +271,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL, "Conflicting transaction not completed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CONFLICT, "Conflict transaction not completed")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
// mnode-mq
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册