未验证 提交 b0a2bcce 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #13927 from taosdata/fix/mnode

enh(mnode): transaction conflict supports two db
......@@ -23,22 +23,22 @@ extern "C" {
// If the error is in a third-party library, place this header file under the third-party library header file.
// When you want to use this feature, you should find or add the same function in the following section.
#ifndef ALLOW_FORBID_FUNC
#define strptime STRPTIME_FUNC_TAOS_FORBID
#define gettimeofday GETTIMEOFDAY_FUNC_TAOS_FORBID
#define localtime LOCALTIME_FUNC_TAOS_FORBID
#define localtime_s LOCALTIMES_FUNC_TAOS_FORBID
#define localtime_r LOCALTIMER_FUNC_TAOS_FORBID
#define time TIME_FUNC_TAOS_FORBID
#define mktime MKTIME_FUNC_TAOS_FORBID
#define strptime STRPTIME_FUNC_TAOS_FORBID
#define gettimeofday GETTIMEOFDAY_FUNC_TAOS_FORBID
#define localtime LOCALTIME_FUNC_TAOS_FORBID
#define localtime_s LOCALTIMES_FUNC_TAOS_FORBID
#define localtime_r LOCALTIMER_FUNC_TAOS_FORBID
#define time TIME_FUNC_TAOS_FORBID
#define mktime MKTIME_FUNC_TAOS_FORBID
#endif
#ifdef WINDOWS
#define CLOCK_REALTIME 0
#define CLOCK_REALTIME 0
#define MILLISECOND_PER_SECOND (1000i64)
#define MILLISECOND_PER_SECOND (1000i64)
#else
#define MILLISECOND_PER_SECOND ((int64_t)1000L)
#define MILLISECOND_PER_SECOND ((int64_t)1000L)
#endif
#define MILLISECOND_PER_MINUTE (MILLISECOND_PER_SECOND * 60)
......@@ -82,13 +82,13 @@ static FORCE_INLINE int64_t taosGetTimestampNs() {
return (int64_t)systemTime.tv_sec * 1000000000L + (int64_t)systemTime.tv_nsec;
}
char *taosStrpTime(const char *buf, const char *fmt, struct tm *tm);
char * taosStrpTime(const char *buf, const char *fmt, struct tm *tm);
struct tm *taosLocalTime(const time_t *timep, struct tm *result);
time_t taosTime(time_t *t);
time_t taosMktime(struct tm *timep);
time_t taosTime(time_t *t);
time_t taosMktime(struct tm *timep);
#ifdef __cplusplus
}
#endif
#endif /*_TD_OS_TIME_H_*/
#endif /*_TD_OS_TIME_H_*/
......@@ -220,7 +220,8 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "db1", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "db2", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "last_action_info",
......
......@@ -124,7 +124,8 @@ typedef struct {
int32_t lastErrorNo;
tmsg_t lastMsgType;
SEpSet lastEpset;
char dbname[TSDB_DB_FNAME_LEN];
char dbname1[TSDB_DB_FNAME_LEN];
char dbname2[TSDB_DB_FNAME_LEN];
int32_t startFunc;
int32_t stopFunc;
int32_t paramLen;
......
......@@ -68,7 +68,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbName(STrans *pTrans, const char *dbname);
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2);
void mndTransSetSerial(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
......
......@@ -477,7 +477,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
mndTransSetDbName(pTrans, dbObj.name);
mndTransSetDbName(pTrans, dbObj.name, NULL);
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
......@@ -668,7 +668,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
int32_t code = -1;
mndTransSetDbName(pTrans, pOld->name);
mndTransSetDbName(pTrans, pOld->name, NULL);
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
......@@ -921,7 +921,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
mndTransSetDbName(pTrans, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
......
......@@ -609,7 +609,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
mndTransSetDbName(pTrans, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
mndTransSetSerial(pTrans);
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
......@@ -852,7 +852,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
mndTransSetDbName(pTrans, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
......
......@@ -758,7 +758,7 @@ _OVER:
}
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbName(pTrans, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
......@@ -1396,7 +1396,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
mndTransSetDbName(pTrans, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (needRsp) {
void *pCont = NULL;
......@@ -1537,7 +1537,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
mndTransSetDbName(pTrans, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
......
......@@ -613,9 +613,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
mndTransSetDbName(pTrans, createStreamReq.sourceDB);
mndTransSetDbName(pTrans, createStreamReq.sourceDB, NULL);
// TODO
/*mndTransSetDbName(pTrans, streamObj.targetDb);*/
/*mndTransSetDbName(pTrans, streamObj.targetDb, NULL);*/
mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
// build stream obj from request
......
......@@ -403,7 +403,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOutputObj *pOutput) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pMsg);
mndTransSetDbName(pTrans, pOutput->pSub->dbName);
mndTransSetDbName(pTrans, pOutput->pSub->dbName, NULL);
if (pTrans == NULL) return -1;
// make txn:
......
......@@ -566,7 +566,7 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
#endif
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq);
mndTransSetDbName(pTrans, pTopic->db);
mndTransSetDbName(pTrans, pTopic->db, NULL);
if (pTrans == NULL) {
mError("topic:%s, failed to drop since %s", pTopic->name, terrstr());
return -1;
......
......@@ -122,7 +122,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, pTrans->conflict, _OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
......@@ -270,7 +271,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans->conflict = conflict;
pTrans->exec = exec;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
......@@ -649,7 +651,14 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *
pTrans->paramLen = paramLen;
}
void mndTransSetDbName(STrans *pTrans, const char *dbname) { memcpy(pTrans->dbname, dbname, TSDB_DB_FNAME_LEN); }
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) {
if (dbname1 != NULL) {
memcpy(pTrans->dbname1, dbname1, TSDB_DB_FNAME_LEN);
}
if (dbname2 != NULL) {
memcpy(pTrans->dbname2, dbname2, TSDB_DB_FNAME_LEN);
}
}
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
......@@ -674,6 +683,12 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
return 0;
}
static bool mndCheckDbConflict(const char *db, STrans *pTrans) {
if (db[0] == 0) return false;
if (strcmp(db, pTrans->dbname1) == 0 || strcmp(db, pTrans->dbname2) == 0) return true;
return false;
}
static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
STrans *pTrans = NULL;
void *pIter = NULL;
......@@ -688,14 +703,21 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (pNew->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pNew->conflict == TRN_CONFLICT_DB) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (mndCheckDbConflict(pNew->dbname1, pTrans)) conflict = true;
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true;
}
}
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB && strcmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB) {
if (mndCheckDbConflict(pNew->dbname1, pTrans)) conflict = true;
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true;
}
}
mError("trans:%d, can't execute since conflict with trans:%d, db:%s", pNew->id, pTrans->id, pTrans->dbname);
mError("trans:%d, can't execute since conflict with trans:%d, db1:%s db2:%s", pNew->id, pTrans->id, pTrans->dbname1,
pTrans->dbname2);
sdbRelease(pMnode->pSdb, pTrans);
}
......@@ -704,7 +726,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (strlen(pTrans->dbname) == 0) {
if (strlen(pTrans->dbname1) == 0 && strlen(pTrans->dbname2) == 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare conflict db not set", pTrans->id);
return -1;
......@@ -1451,10 +1473,15 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)stage, false);
char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(pTrans->dbname), pShow->pMeta->pSchemas[cols].bytes);
char dbname1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname1, mndGetDbStr(pTrans->dbname1), pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)dbname1, false);
char dbname2[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname2, mndGetDbStr(pTrans->dbname2), pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)dbname, false);
colDataAppend(pColInfo, numOfRows, (const char *)dbname2, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false);
......
......@@ -128,7 +128,7 @@ class MndTestTrans2 : public ::testing::Test {
mndTransSetCb(pTrans, TRANS_START_FUNC_TEST, TRANS_STOP_FUNC_TEST, param, strlen(param) + 1);
if (pDb != NULL) {
mndTransSetDbName(pTrans, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
}
int32_t code = mndTransPrepare(pMnode, pTrans);
......@@ -201,7 +201,7 @@ class MndTestTrans2 : public ::testing::Test {
}
if (pDb != NULL) {
mndTransSetDbName(pTrans, pDb->name);
mndTransSetDbName(pTrans, pDb->name, NULL);
}
int32_t code = mndTransPrepare(pMnode, pTrans);
......
......@@ -376,17 +376,19 @@ static void transDQTimeout(uv_timer_t* timer) {
SDelayQueue* queue = timer->data;
tTrace("timer %p timeout", timer);
uint64_t timeout = 0;
int64_t current = taosGetTimestampMs();
do {
HeapNode* minNode = heapMin(queue->heap);
if (minNode == NULL) break;
SDelayTask* task = container_of(minNode, SDelayTask, node);
if (task->execTime <= taosGetTimestampMs()) {
if (task->execTime <= current) {
heapRemove(queue->heap, minNode);
task->func(task->arg);
taosMemoryFree(task);
timeout = 0;
} else {
timeout = task->execTime - taosGetTimestampMs();
timeout = task->execTime - current;
break;
}
} while (1);
......
......@@ -283,7 +283,8 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(item, "conflict", pObj->conflict);
tjsonAddIntegerToObject(item, "exec", pObj->exec);
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
tjsonAddStringToObject(item, "dbname", pObj->dbname);
tjsonAddStringToObject(item, "dbname1", pObj->dbname1);
tjsonAddStringToObject(item, "dbname2", pObj->dbname2);
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions));
tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions));
tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册