提交 e68772df 编写于 作者: G gccgdb1234

Merge branch '3.0' of github.com:taosdata/TDengine into 3.0

......@@ -254,6 +254,7 @@ typedef enum ELogicConditionType {
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 64
#define TSDB_TRANS_DESC_LEN 128
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128
......
......@@ -130,7 +130,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
_OVER:
if (code != 0) {
dError("msg:%p, failed to process since %s", pMsg, terrstr());
dTrace("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pRpc->msgType));
if (terrno != 0) code = terrno;
if (IsReq(pRpc)) {
......
......@@ -60,14 +60,12 @@ typedef enum {
typedef enum {
TRN_STAGE_PREPARE = 0,
TRN_STAGE_REDO_LOG = 1,
TRN_STAGE_REDO_ACTION = 2,
TRN_STAGE_ROLLBACK = 3,
TRN_STAGE_UNDO_ACTION = 4,
TRN_STAGE_UNDO_LOG = 5,
TRN_STAGE_COMMIT = 6,
TRN_STAGE_COMMIT_LOG = 7,
TRN_STAGE_FINISHED = 8
TRN_STAGE_REDO_ACTION = 1,
TRN_STAGE_ROLLBACK = 2,
TRN_STAGE_UNDO_ACTION = 3,
TRN_STAGE_COMMIT = 4,
TRN_STAGE_COMMIT_ACTION = 5,
TRN_STAGE_FINISHED = 6
} ETrnStage;
typedef enum {
......@@ -131,7 +129,7 @@ typedef enum {
typedef enum {
TRN_EXEC_PARALLEL = 0,
TRN_EXEC_ONE_BY_ONE = 1,
TRN_EXEC_NO_PARALLEL = 1,
} ETrnExecType;
typedef enum {
......@@ -168,16 +166,16 @@ typedef struct {
SRpcHandleInfo rpcInfo;
void* rpcRsp;
int32_t rpcRspLen;
SArray* redoLogs;
SArray* undoLogs;
SArray* commitLogs;
int32_t redoActionPos;
SArray* redoActions;
SArray* undoActions;
SArray* commitActions;
int64_t createdTime;
int64_t lastExecTime;
int64_t dbUid;
char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_ERROR_LEN];
char desc[TSDB_TRANS_DESC_LEN];
int32_t startFunc;
int32_t stopFunc;
int32_t paramLen;
......
......@@ -26,31 +26,24 @@ typedef enum {
TRANS_START_FUNC_TEST = 1,
TRANS_STOP_FUNC_TEST = 2,
TRANS_START_FUNC_MQ_REB = 3,
TRANS_STOP_FUNC_TEST_MQ_REB = 4,
TRANS_STOP_FUNC_MQ_REB = 4,
} ETrnFunc;
typedef struct {
SEpSet epSet;
tmsg_t msgType;
int8_t msgSent;
int8_t msgReceived;
int32_t id;
int32_t errCode;
int32_t acceptableCode;
int8_t stage;
int8_t isRaw;
int8_t rawWritten;
int8_t msgSent;
int8_t msgReceived;
tmsg_t msgType;
SEpSet epSet;
int32_t contLen;
void *pCont;
} STransAction;
typedef struct {
SSdbRaw *pRaw;
} STransLog;
typedef struct {
ETrnStep stepType;
STransAction redoAction;
STransAction undoAction;
STransLog redoLog;
STransLog undoLog;
} STransStep;
} STransAction;
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);
......@@ -69,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
void mndTransSetExecOneByOne(STrans *pTrans);
void mndTransSetNoParallel(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SRpcMsg *pRsp);
......
......@@ -78,10 +78,8 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("acct:%s, will be created while deploy sdb, raw:%p", acctObj.acct, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
mDebug("acct:%s, will be created when deploying, raw:%p", acctObj.acct, pRaw);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_ACCT, NULL);
if (pTrans == NULL) {
mError("acct:%s, failed to create since %s", acctObj.acct, terrstr());
......@@ -94,7 +92,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......@@ -104,7 +101,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
mndTransDrop(pTrans);
return 0;
#endif
}
static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) {
......
......@@ -172,13 +172,13 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
clusterObj.id = mndGenerateUid(clusterObj.name, TSDB_CLUSTER_ID_LEN);
clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id);
pMnode->clusterId = clusterObj.id;
mDebug("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name);
mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name);
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("cluster:%" PRId64 ", will be created while deploy sdb, raw:%p", clusterObj.id, pRaw);
mDebug("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
#else
......
......@@ -1314,7 +1314,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName);
if (pDb == NULL) {
mDebug("db:%s, no exist", pDbVgVersion->dbFName);
mTrace("db:%s, no exist", pDbVgVersion->dbFName);
memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN);
usedbRsp.uid = pDbVgVersion->dbId;
usedbRsp.vgVersion = -1;
......
......@@ -98,7 +98,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
if (pRaw == NULL) return -1;
if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1;
mDebug("dnode:%d, will be created while deploy sdb, raw:%p", dnodeObj.id, pRaw);
mDebug("dnode:%d, will be created when deploying, raw:%p", dnodeObj.id, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
......@@ -388,9 +388,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
mndReleaseMnode(pMnode, pObj);
}
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
bool dnodeChanged = (statusReq.dnodeVer != sdbGetTableVer(pMnode->pSdb, SDB_DNODE));
bool dnodeChanged = (statusReq.dnodeVer != dnodeVer);
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool needCheck = !online || dnodeChanged || reboot;
......@@ -433,7 +434,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
if (!online) {
mInfo("dnode:%d, from offline to online", pDnode->id);
} else {
mDebug("dnode:%d, send dnode eps", pDnode->id);
mDebug("dnode:%d, send dnode epset, online:%d ver:% " PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
statusReq.dnodeVer, dnodeVer, reboot);
}
pDnode->rebootTime = statusReq.rebootTime;
......@@ -441,7 +443,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
SStatusRsp statusRsp = {0};
statusRsp.dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
statusRsp.dnodeVer = dnodeVer;
statusRsp.dnodeCfg.dnodeId = pDnode->id;
statusRsp.dnodeCfg.clusterId = pMnode->clusterId;
statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp));
......
......@@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
} else if (code == 0) {
mTrace("msg:%p, successfully processed and response", pMsg);
} else {
mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
mDebug("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle,
TMSG_INFO(pMsg->msgType));
}
......
......@@ -90,7 +90,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("mnode:%d, will be created while deploy sdb, raw:%p", mnodeObj.id, pRaw);
mDebug("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
......@@ -367,7 +367,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode,
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
mndTransSetExecOneByOne(pTrans);
mndTransSetNoParallel(pTrans);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
......@@ -539,7 +539,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) {
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
mndTransSetExecOneByOne(pTrans);
mndTransSetNoParallel(pTrans);
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
......
......@@ -507,7 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetExecOneByOne(pTrans);
mndTransSetNoParallel(pTrans);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
......
......@@ -1597,7 +1597,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
pReq->info.rspLen = rspLen;
code = 0;
mDebug("stb:%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName);
mTrace("%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName);
_OVER:
if (code != 0) {
......
......@@ -501,7 +501,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 4. TODO commit log: modification log
// 5. set cb
mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_TEST_MQ_REB, NULL, 0);
mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
// 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) {
......
......@@ -65,7 +65,7 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
void mndRestoreFinish(struct SSyncFSM *pFsm) {
SMnode *pMnode = pFsm->data;
if (!pMnode->deploy) {
mInfo("mnode sync restore finished");
mInfo("mnode sync restore finished, and will handle outstanding transactions");
mndTransPullup(pMnode);
mndSetRestore(pMnode, true);
} else {
......@@ -244,7 +244,7 @@ void mndSyncStart(SMnode *pMnode) {
} else {
syncStart(pMgmt->sync);
}
mDebug("sync:%" PRId64 " is started, standby:%d", pMgmt->sync, pMgmt->standby);
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
}
void mndSyncStop(SMnode *pMnode) {}
......
......@@ -37,19 +37,18 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static void mndTransDropLogs(SArray *pArray);
static void mndTransDropActions(SArray *pArray);
static void mndTransDropData(STrans *pTrans);
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray);
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
......@@ -83,40 +82,30 @@ int32_t mndInitTrans(SMnode *pMnode) {
void mndCleanupTrans(SMnode *pMnode) {}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
static int32_t mndTransGetActionsSize(SArray *pArray) {
int32_t actionNum = taosArrayGetSize(pArray);
int32_t rawDataLen = 0;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE;
int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
for (int32_t i = 0; i < actionNum; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction->isRaw) {
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
} else {
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
rawDataLen += sizeof(pAction->isRaw);
}
for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
}
return rawDataLen;
}
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE;
rawDataLen += mndTransGetActionsSize(pTrans->redoActions);
rawDataLen += mndTransGetActionsSize(pTrans->undoActions);
rawDataLen += mndTransGetActionsSize(pTrans->commitActions);
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TRANS_VER_NUMBER, rawDataLen);
if (pRaw == NULL) {
......@@ -126,67 +115,85 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER)
ETrnStage stage = pTrans->stage;
if (stage == TRN_STAGE_REDO_LOG || stage == TRN_STAGE_REDO_ACTION) {
stage = TRN_STAGE_PREPARE;
} else if (stage == TRN_STAGE_UNDO_ACTION || stage == TRN_STAGE_UNDO_LOG) {
stage = TRN_STAGE_ROLLBACK;
} else if (stage == TRN_STAGE_COMMIT_LOG || stage == TRN_STAGE_FINISHED) {
stage = TRN_STAGE_COMMIT;
} else {
}
SDB_SET_INT16(pRaw, dataPos, stage, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->stage, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, redoLogNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, undoLogNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, commitLogNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
int32_t commitActionNum = taosArrayGetSize(pTrans->commitActions);
SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER)
for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
}
for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
}
for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
}
SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER)
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->isRaw) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
}
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->isRaw) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
}
}
for (int32_t i = 0; i < commitActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->commitActions, i);
SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER)
if (pAction->isRaw) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER)
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
}
}
SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER)
......@@ -220,11 +227,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
char *pData = NULL;
int32_t dataLen = 0;
int8_t sver = 0;
int32_t redoLogNum = 0;
int32_t undoLogNum = 0;
int32_t commitLogNum = 0;
int32_t redoActionNum = 0;
int32_t undoActionNum = 0;
int32_t commitActionNum = 0;
int32_t dataPos = 0;
STransAction action = {0};
......@@ -256,79 +261,106 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans->type = type;
pTrans->parallel = parallel;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoLogNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoLogNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &commitLogNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER)
pTrans->redoLogs = taosArrayInit(redoLogNum, sizeof(void *));
pTrans->undoLogs = taosArrayInit(undoLogNum, sizeof(void *));
pTrans->commitLogs = taosArrayInit(commitLogNum, sizeof(void *));
pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(commitActionNum, sizeof(STransAction));
if (pTrans->redoLogs == NULL) goto _OVER;
if (pTrans->undoLogs == NULL) goto _OVER;
if (pTrans->commitLogs == NULL) goto _OVER;
if (pTrans->redoActions == NULL) goto _OVER;
if (pTrans->undoActions == NULL) goto _OVER;
if (pTrans->commitActions == NULL) goto _OVER;
for (int32_t i = 0; i < redoLogNum; ++i) {
for (int32_t i = 0; i < redoActionNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.isRaw) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto _OVER;
pData = NULL;
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
}
for (int32_t i = 0; i < undoLogNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto _OVER;
pData = NULL;
}
for (int32_t i = 0; i < commitLogNum; ++i) {
for (int32_t i = 0; i < undoActionNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.isRaw) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto _OVER;
pData = NULL;
}
for (int32_t i = 0; i < redoActionNum; ++i) {
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
}
}
for (int32_t i = 0; i < undoActionNum; ++i) {
for (int32_t i = 0; i < commitActionNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER)
if (action.isRaw) {
SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER)
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
action.pRaw = taosMemoryMalloc(dataLen);
if (action.pRaw == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pRaw = NULL;
} else {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER)
SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
}
}
SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->stopFunc, _OVER)
......@@ -347,7 +379,6 @@ _OVER:
mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr());
mndTransDropData(pTrans);
taosMemoryFreeClear(pRow);
taosMemoryFreeClear(pData);
taosMemoryFreeClear(action.pCont);
return NULL;
}
......@@ -360,20 +391,16 @@ static const char *mndTransStr(ETrnStage stage) {
switch (stage) {
case TRN_STAGE_PREPARE:
return "prepare";
case TRN_STAGE_REDO_LOG:
return "redoLog";
case TRN_STAGE_REDO_ACTION:
return "redoAction";
case TRN_STAGE_COMMIT:
return "commit";
case TRN_STAGE_COMMIT_LOG:
return "commitLog";
case TRN_STAGE_UNDO_ACTION:
return "undoAction";
case TRN_STAGE_UNDO_LOG:
return "undoLog";
case TRN_STAGE_ROLLBACK:
return "rollback";
case TRN_STAGE_UNDO_ACTION:
return "undoAction";
case TRN_STAGE_COMMIT:
return "commit";
case TRN_STAGE_COMMIT_ACTION:
return "commitAction";
case TRN_STAGE_FINISHED:
return "finished";
default:
......@@ -472,7 +499,7 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) {
return mndTransTestStopFunc;
case TRANS_START_FUNC_MQ_REB:
return mndRebCntInc;
case TRANS_STOP_FUNC_TEST_MQ_REB:
case TRANS_STOP_FUNC_MQ_REB:
return mndRebCntDec;
default:
return NULL;
......@@ -493,11 +520,9 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
}
static void mndTransDropData(STrans *pTrans) {
mndTransDropLogs(pTrans->redoLogs);
mndTransDropLogs(pTrans->undoLogs);
mndTransDropLogs(pTrans->commitLogs);
mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions);
mndTransDropActions(pTrans->commitActions);
if (pTrans->rpcRsp != NULL) {
taosMemoryFree(pTrans->rpcRsp);
pTrans->rpcRsp = NULL;
......@@ -511,7 +536,7 @@ static void mndTransDropData(STrans *pTrans) {
}
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
mDebug("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
mTrace("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage),
callFunc);
if (pTrans->stopFunc > 0 && callFunc) {
TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc);
......@@ -524,20 +549,35 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
return 0;
}
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
if (pNew->stage == TRN_STAGE_COMMIT) {
pNew->stage = TRN_STAGE_COMMIT_LOG;
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG));
}
if (pNew->stage == TRN_STAGE_ROLLBACK) {
pNew->stage = TRN_STAGE_FINISHED;
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_ROLLBACK), mndTransStr(TRN_STAGE_FINISHED));
static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) {
for (int32_t i = 0; i < taosArrayGetSize(pOldArray); ++i) {
STransAction *pOldAction = taosArrayGet(pOldArray, i);
STransAction *pNewAction = taosArrayGet(pNewArray, i);
pOldAction->rawWritten = pNewAction->rawWritten;
pOldAction->msgSent = pNewAction->msgSent;
pOldAction->msgReceived = pNewAction->msgReceived;
pOldAction->errCode = pNewAction->errCode;
}
}
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
mndTransUpdateActions(pOld->redoActions, pNew->redoActions);
mndTransUpdateActions(pOld->undoActions, pNew->undoActions);
mndTransUpdateActions(pOld->commitActions, pNew->commitActions);
pOld->stage = pNew->stage;
pOld->redoActionPos = pNew->redoActionPos;
if (pOld->stage == TRN_STAGE_COMMIT) {
pOld->stage = TRN_STAGE_COMMIT_ACTION;
mTrace("trans:%d, stage from commit to commitAction", pNew->id);
}
if (pOld->stage == TRN_STAGE_ROLLBACK) {
pOld->stage = TRN_STAGE_FINISHED;
mTrace("trans:%d, stage from rollback to finished", pNew->id);
}
return 0;
}
......@@ -566,41 +606,33 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy;
pTrans->type = type;
pTrans->parallel = TRN_EXEC_PARALLEL;
pTrans->createdTime = taosGetTimestampMs();
if (pReq != NULL) pTrans->rpcInfo = pReq->info;
pTrans->redoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to create transaction since %s", terrstr());
return NULL;
}
mDebug("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
if (pReq != NULL) pTrans->rpcInfo = pReq->info;
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
return pTrans;
}
static void mndTransDropLogs(SArray *pArray) {
int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < size; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
sdbFreeRaw(pRaw);
}
taosArrayDestroy(pArray);
}
static void mndTransDropActions(SArray *pArray) {
int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < size; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction->isRaw) {
taosMemoryFreeClear(pAction->pRaw);
} else {
taosMemoryFreeClear(pAction->pCont);
}
}
taosArrayDestroy(pArray);
}
......@@ -608,18 +640,15 @@ static void mndTransDropActions(SArray *pArray) {
void mndTransDrop(STrans *pTrans) {
if (pTrans != NULL) {
mndTransDropData(pTrans);
mDebug("trans:%d, local object is freed, data:%p", pTrans->id, pTrans);
mTrace("trans:%d, local object is freed, data:%p", pTrans->id, pTrans);
taosMemoryFreeClear(pTrans);
}
}
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
if (pArray == NULL || pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
pAction->id = taosArrayGetSize(pArray);
void *ptr = taosArrayPush(pArray, &pRaw);
void *ptr = taosArrayPush(pArray, pAction);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -628,27 +657,28 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
return 0;
}
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); }
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); }
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); }
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .isRaw = true, .pRaw = pRaw};
return mndTransAppendAction(pTrans->redoActions, &action);
}
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
void *ptr = taosArrayPush(pArray, pAction);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .isRaw = true, .pRaw = pRaw};
return mndTransAppendAction(pTrans->undoActions, &action);
}
return 0;
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .isRaw = true, .pRaw = pRaw};
return mndTransAppendAction(pTrans->commitActions, &action);
}
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_REDO_ACTION;
return mndTransAppendAction(pTrans->redoActions, pAction);
}
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_UNDO_ACTION;
return mndTransAppendAction(pTrans->undoActions, pAction);
}
......@@ -665,11 +695,10 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *
}
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
pTrans->dbUid = pDb->uid;
memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN);
}
void mndTransSetExecOneByOne(STrans *pTrans) { pTrans->parallel = TRN_EXEC_ONE_BY_ONE; }
void mndTransSetNoParallel(STrans *pTrans) { pTrans->parallel = TRN_EXEC_NO_PARALLEL; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
SSdbRaw *pRaw = mndTransActionEncode(pTrans);
......@@ -679,7 +708,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
}
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("trans:%d, sync to other nodes", pTrans->id);
mDebug("trans:%d, sync to other mnodes", pTrans->id);
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
if (code != 0) {
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
......@@ -732,7 +761,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
conflict = true;
} else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) {
if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
conflict = true;
}
......@@ -745,7 +774,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
conflict = true;
} else if (mndIsDbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) {
if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
conflict = true;
}
......@@ -768,7 +797,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1;
}
if (taosArrayGetSize(pTrans->commitLogs) <= 0) {
if (taosArrayGetSize(pTrans->commitActions) <= 0) {
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;
......@@ -799,8 +828,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
}
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0;
mDebug("trans:%d, commit transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
......@@ -829,8 +856,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION ||
pTrans->stage == TRN_STAGE_ROLLBACK) {
if (pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) {
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
sendRsp = true;
}
......@@ -848,13 +874,9 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
taosMemoryFree(pTrans->rpcRsp);
mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, code, pTrans->stage, pTrans->rpcInfo.ahandle);
SRpcMsg rspMsg = {
.code = code,
.pCont = rpcCont,
.contLen = pTrans->rpcRspLen,
.info = pTrans->rpcInfo,
};
mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage),
pTrans->rpcInfo.ahandle);
SRpcMsg rspMsg = {.code = code, .pCont = rpcCont, .contLen = pTrans->rpcRspLen, .info = pTrans->rpcInfo};
tmsgSendRsp(&rspMsg);
pTrans->rpcInfo.handle = NULL;
pTrans->rpcRsp = NULL;
......@@ -904,94 +926,57 @@ void mndTransProcessRsp(SRpcMsg *pRsp) {
}
}
mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%04x", transId, action, pRsp->code,
pAction->acceptableCode);
mDebug("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x", transId, mndTransStr(pAction->stage), action,
pRsp->code, pAction->acceptableCode);
mndTransExecute(pMnode, pTrans);
_OVER:
mndReleaseTrans(pMnode, pTrans);
}
static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
SSdb *pSdb = pMnode->pSdb;
int32_t arraySize = taosArrayGetSize(pArray);
if (arraySize == 0) return 0;
int32_t code = 0;
for (int32_t i = 0; i < arraySize; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
if (sdbWriteWithoutFree(pSdb, pRaw) != 0) {
code = ((terrno != 0) ? terrno : -1);
}
}
terrno = code;
return code;
}
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteLogs(pMnode, pTrans->redoLogs);
if (code != 0) {
mError("failed to execute redoLogs since %s", terrstr());
}
return code;
}
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteLogs(pMnode, pTrans->undoLogs);
if (code != 0) {
mError("failed to execute undoLogs since %s, return success", terrstr());
}
return 0; // return success in any case
}
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitLogs);
if (code != 0) {
mError("failed to execute commitLogs since %s", terrstr());
}
return code;
}
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue;
if (pAction->msgSent && pAction->msgReceived &&
(pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode))
continue;
if (pAction->rawWritten && (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) continue;
pAction->rawWritten = 0;
pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = 0;
mDebug("trans:%d, action:%d execute status is reset", pTrans->id, action);
mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action);
}
}
static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
if (pAction->rawWritten) return 0;
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
if (pAction->msgSent) {
if (pAction->msgReceived) {
continue;
} else {
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
break;
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
pAction->rawWritten = true;
pAction->errCode = 0;
code = 0;
mDebug("trans:%d, %s:%d write to sdb", pTrans->id, mndTransStr(pAction->stage), pAction->id);
} else {
continue;
}
}
pAction->errCode = (terrno != 0) ? terrno : code;
mError("trans:%d, %s:%d failed to write sdb since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
terrstr());
}
return code;
}
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
if (pAction->msgSent) return 0;
if (!pMnode->deploy && !mndIsMaster(pMnode)) return -1;
int64_t signature = pTrans->id;
signature = (signature << 32);
signature += action;
signature += pAction->id;
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
......@@ -1001,49 +986,65 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
}
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) {
mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, action, pAction->epSet.eps[pAction->epSet.inUse].fqdn,
pAction->epSet.eps[pAction->epSet.inUse].port);
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg);
if (code == 0) {
pAction->msgSent = 1;
pAction->msgReceived = 0;
pAction->errCode = 0;
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
break;
}
mDebug("trans:%d, %s:%d is sent to %s:%u", pTrans->id, mndTransStr(pAction->stage), pAction->id,
pAction->epSet.eps[pAction->epSet.inUse].fqdn, pAction->epSet.eps[pAction->epSet.inUse].port);
} else {
pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = terrno;
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
return -1;
pAction->errCode = (terrno != 0) ? terrno : code;
mError("trans:%d, %s:%d not send since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr());
}
return code;
}
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
if (pAction->isRaw) {
return mndTransWriteSingleLog(pMnode, pTrans, pAction);
} else {
return mndTransSendSingleMsg(pMnode, pTrans, pAction);
}
}
return 0;
static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
int32_t code = 0;
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
code = mndTransExecSingleAction(pMnode, pTrans, pAction);
if (code != 0) break;
}
return code;
}
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
if (numOfActions == 0) return 0;
if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) {
if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) {
return -1;
}
int32_t numOfReceived = 0;
int32_t numOfExecuted = 0;
int32_t errCode = 0;
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
if (pAction->msgSent && pAction->msgReceived) {
numOfReceived++;
if (pAction->msgReceived || pAction->rawWritten) {
numOfExecuted++;
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
errCode = pAction->errCode;
}
}
}
if (numOfReceived == numOfActions) {
if (numOfExecuted == numOfActions) {
if (errCode == 0) {
mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions);
return 0;
......@@ -1054,7 +1055,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
return errCode;
}
} else {
mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions);
mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfExecuted, numOfActions);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
}
......@@ -1075,35 +1076,79 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
return code;
}
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
pTrans->stage = TRN_STAGE_REDO_LOG;
mDebug("trans:%d, stage from prepare to redoLog", pTrans->id);
return continueExec;
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute commitActions since %s", terrstr());
}
return code;
}
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
static int32_t mndTransExecuteRedoActionsNoParallel(SMnode *pMnode, STrans *pTrans) {
int32_t code = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
if (numOfActions == 0) return code;
if (pTrans->redoActionPos >= numOfActions) return code;
for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos);
code = mndTransExecSingleAction(pMnode, pTrans, pAction);
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_REDO_ACTION;
mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id);
if (pAction->msgSent) {
if (pAction->msgReceived) {
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
code = pAction->errCode;
}
} else {
pTrans->code = terrno;
pTrans->stage = TRN_STAGE_UNDO_LOG;
mError("trans:%d, stage from redoLog to undoLog since %s", pTrans->id, terrstr());
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
}
if (pAction->rawWritten) {
if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) {
code = pAction->errCode;
}
}
}
if (code == 0) {
pTrans->redoActionPos++;
mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
pAction->id);
code = mndTransSync(pMnode, pTrans);
if (code != 0) {
mError("trans:%d, failed to sync redoActionPos since %s", pTrans->id, terrstr());
break;
}
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id);
break;
} else {
mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id,
terrstr());
break;
}
}
return code;
}
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
pTrans->stage = TRN_STAGE_REDO_ACTION;
mDebug("trans:%d, stage from prepare to redoAction", pTrans->id);
return continueExec;
}
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
if (!pMnode->deploy && !mndIsMaster(pMnode)) return false;
bool continueExec = true;
int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
int32_t code = 0;
if (pTrans->parallel == TRN_EXEC_NO_PARALLEL) {
code = mndTransExecuteRedoActionsNoParallel(pMnode, pTrans);
} else {
code = mndTransExecuteRedoActions(pMnode, pTrans);
}
if (code == 0) {
pTrans->code = 0;
......@@ -1135,8 +1180,8 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_COMMIT_LOG;
mDebug("trans:%d, stage from commit to commitLog", pTrans->id);
pTrans->stage = TRN_STAGE_COMMIT_ACTION;
mDebug("trans:%d, stage from commit to commitAction", pTrans->id);
continueExec = true;
} else {
pTrans->code = terrno;
......@@ -1155,35 +1200,19 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
return continueExec;
}
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) {
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);
int32_t code = mndTransExecuteCommitActions(pMnode, pTrans);
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_FINISHED;
mDebug("trans:%d, stage from commitLog to finished", pTrans->id);
mDebug("trans:%d, stage from commitAction to finished", pTrans->id);
continueExec = true;
} else {
pTrans->code = terrno;
pTrans->failedTimes++;
mError("trans:%d, stage keep on commitLog since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
return continueExec;
}
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_ROLLBACK;
mDebug("trans:%d, stage from undoLog to rollback", pTrans->id);
continueExec = true;
} else {
mError("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr());
mError("trans:%d, stage keep on commitAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
......@@ -1191,14 +1220,12 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
}
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
if (!pMnode->deploy && !mndIsMaster(pMnode)) return false;
bool continueExec = true;
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_UNDO_LOG;
mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id);
pTrans->stage = TRN_STAGE_ROLLBACK;
mDebug("trans:%d, stage from undoAction to rollback", pTrans->id);
continueExec = true;
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
......@@ -1243,8 +1270,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
}
mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
mDebug("trans:%d, execute finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes);
return continueExec;
}
......@@ -1257,24 +1283,18 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
case TRN_STAGE_PREPARE:
continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
break;
case TRN_STAGE_REDO_LOG:
continueExec = mndTransPerformRedoLogStage(pMnode, pTrans);
break;
case TRN_STAGE_REDO_ACTION:
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_UNDO_LOG:
continueExec = mndTransPerformUndoLogStage(pMnode, pTrans);
case TRN_STAGE_COMMIT:
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT_ACTION:
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans);
break;
case TRN_STAGE_UNDO_ACTION:
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT_LOG:
continueExec = mndTransPerformCommitLogStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT:
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
break;
case TRN_STAGE_ROLLBACK:
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
break;
......@@ -1313,15 +1333,15 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
if (pAction == NULL) continue;
if (pAction->msgReceived == 0) {
mInfo("trans:%d, action:%d set processed for kill msg received", pTrans->id, i);
mInfo("trans:%d, %s:%d set processed for kill msg received", pTrans->id, mndTransStr(pAction->stage), i);
pAction->msgSent = 1;
pAction->msgReceived = 1;
pAction->errCode = 0;
}
if (pAction->errCode != 0) {
mInfo("trans:%d, action:%d set processed for kill msg received, errCode from %s to success", pTrans->id, i,
tstrerror(pAction->errCode));
mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id,
mndTransStr(pAction->stage), i, tstrerror(pAction->errCode));
pAction->msgSent = 1;
pAction->msgReceived = 1;
pAction->errCode = 0;
......
......@@ -77,7 +77,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
if (pRaw == NULL) return -1;
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
mDebug("user:%s, will be created while deploy sdb, raw:%p", userObj.user, pRaw);
mDebug("user:%s, will be created when deploying, raw:%p", userObj.user, pRaw);
#if 0
return sdbWrite(pMnode->pSdb, pRaw);
......
......@@ -501,7 +501,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) {
*ppVgroups = pVgroups;
code = 0;
mInfo("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications);
_OVER:
if (code != 0) taosMemoryFree(pVgroups);
......@@ -539,7 +539,7 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
pDnode->numOfVnodes++;
mInfo("db:%s, vgId:%d, vn:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId);
mInfo("db:%s, vgId:%d, vnode_index:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId);
maxPos++;
if (maxPos == 3) return 0;
}
......
......@@ -168,6 +168,7 @@ typedef struct SSdb {
char *currDir;
char *tmpDir;
int64_t lastCommitVer;
int64_t lastCommitTerm;
int64_t curVer;
int64_t curTerm;
int64_t tableVer[SDB_MAX];
......
......@@ -55,6 +55,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
pSdb->curVer = -1;
pSdb->curTerm = -1;
pSdb->lastCommitVer = -1;
pSdb->lastCommitTerm = -1;
pSdb->pMnode = pOption->pMnode;
taosThreadMutexInit(&pSdb->filelock, NULL);
mDebug("sdb init successfully");
......
......@@ -70,6 +70,7 @@ static void sdbResetData(SSdb *pSdb) {
pSdb->curVer = -1;
pSdb->curTerm = -1;
pSdb->lastCommitVer = -1;
pSdb->lastCommitTerm = -1;
mDebug("sdb reset successfully");
}
......@@ -211,12 +212,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to read file:%s", file);
mDebug("start to read sdb file:%s", file);
SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100);
if (pRaw == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed read file since %s", terrstr());
mError("failed read sdb file since %s", terrstr());
return -1;
}
......@@ -224,12 +225,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
if (pFile == NULL) {
taosMemoryFree(pRaw);
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, terrstr());
mError("failed to read sdb file:%s since %s", file, terrstr());
return 0;
}
if (sdbReadFileHead(pSdb, pFile) != 0) {
mError("failed to read file:%s head since %s", file, terrstr());
mError("failed to read sdb file:%s head since %s", file, terrstr());
taosMemoryFree(pRaw);
taosCloseFile(&pFile);
return -1;
......@@ -245,13 +246,13 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, tstrerror(code));
mError("failed to read sdb file:%s since %s", file, tstrerror(code));
break;
}
if (ret != readLen) {
code = TSDB_CODE_FILE_CORRUPTED;
mError("failed to read file:%s since %s", file, tstrerror(code));
mError("failed to read sdb file:%s since %s", file, tstrerror(code));
break;
}
......@@ -259,34 +260,36 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
ret = taosReadFile(pFile, pRaw->pData, readLen);
if (ret < 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to read file:%s since %s", file, tstrerror(code));
mError("failed to read sdb file:%s since %s", file, tstrerror(code));
break;
}
if (ret != readLen) {
code = TSDB_CODE_FILE_CORRUPTED;
mError("failed to read file:%s since %s", file, tstrerror(code));
mError("failed to read sdb file:%s since %s", file, tstrerror(code));
break;
}
int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t);
if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) {
code = TSDB_CODE_CHECKSUM_ERROR;
mError("failed to read file:%s since %s", file, tstrerror(code));
mError("failed to read sdb file:%s since %s", file, tstrerror(code));
break;
}
code = sdbWriteWithoutFree(pSdb, pRaw);
if (code != 0) {
mError("failed to read file:%s since %s", file, terrstr());
mError("failed to read sdb file:%s since %s", file, terrstr());
goto _OVER;
}
}
code = 0;
pSdb->lastCommitVer = pSdb->curVer;
pSdb->lastCommitTerm = pSdb->curTerm;
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer);
mDebug("read sdb file:%s successfully, ver:%" PRId64 " term:%" PRId64, file, pSdb->lastCommitVer,
pSdb->lastCommitTerm);
_OVER:
taosCloseFile(&pFile);
......@@ -302,7 +305,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
sdbResetData(pSdb);
int32_t code = sdbReadFileImp(pSdb);
if (code != 0) {
mError("failed to read sdb since %s", terrstr());
mError("failed to read sdb file since %s", terrstr());
sdbResetData(pSdb);
}
......@@ -318,18 +321,19 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
char curfile[PATH_MAX] = {0};
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
pSdb->curTerm, pSdb->lastCommitVer);
mDebug("start to write sdb file, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64 " term:%" PRId64
" file:%s",
pSdb->curVer, pSdb->curTerm, pSdb->lastCommitVer, pSdb->lastCommitTerm, curfile);
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
if (pFile == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s for write since %s", tmpfile, terrstr());
mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr());
return -1;
}
if (sdbWriteFileHead(pSdb, pFile) != 0) {
mError("failed to write file:%s head since %s", tmpfile, terrstr());
mError("failed to write sdb file:%s head since %s", tmpfile, terrstr());
taosCloseFile(&pFile);
return -1;
}
......@@ -338,7 +342,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
SdbEncodeFp encodeFp = pSdb->encodeFps[i];
if (encodeFp == NULL) continue;
mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
mTrace("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i));
SHashObj *hash = pSdb->hashObjs[i];
TdThreadRwlock *pLock = &pSdb->locks[i];
......@@ -394,7 +398,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
code = taosFsyncFile(pFile);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to sync file:%s since %s", tmpfile, tstrerror(code));
mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code));
}
}
......@@ -404,15 +408,17 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
code = taosRenameFile(tmpfile, curfile);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno);
mError("failed to write file:%s since %s", curfile, tstrerror(code));
mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
}
}
if (code != 0) {
mError("failed to write file:%s since %s", curfile, tstrerror(code));
mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
} else {
pSdb->lastCommitVer = pSdb->curVer;
mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm);
pSdb->lastCommitTerm = pSdb->curTerm;
mDebug("write sdb file successfully, ver:%" PRId64 " term:%" PRId64 " file:%s", pSdb->lastCommitVer,
pSdb->lastCommitTerm, curfile);
}
terrno = code;
......@@ -427,7 +433,7 @@ int32_t sdbWriteFile(SSdb *pSdb) {
taosThreadMutexLock(&pSdb->filelock);
int32_t code = sdbWriteFileImp(pSdb);
if (code != 0) {
mError("failed to write sdb since %s", terrstr());
mError("failed to write sdb file since %s", terrstr());
}
taosThreadMutexUnlock(&pSdb->filelock);
return code;
......@@ -493,7 +499,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
if (taosCopyFile(datafile, pIter->name) < 0) {
taosThreadMutexUnlock(&pSdb->filelock);
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to copy file %s to %s since %s", datafile, pIter->name, terrstr());
mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr());
sdbCloseIter(pIter);
return -1;
}
......@@ -502,7 +508,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
pIter->file = taosOpenFile(pIter->name, TD_FILE_READ);
if (pIter->file == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
mError("failed to open file:%s since %s", pIter->name, terrstr());
mError("failed to open sdb file:%s since %s", pIter->name, terrstr());
sdbCloseIter(pIter);
return -1;
}
......
......@@ -79,7 +79,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) {
if (taskHandle) {
code = qExecTask(taskHandle, &pRes, &useconds);
if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
} else {
QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
}
QW_ERR_RET(code);
}
}
......
......@@ -494,6 +494,7 @@ class TDDnodes:
self.simDeployed = False
self.testCluster = False
self.valgrind = 0
self.killValgrind = 1
def init(self, path, remoteIP = ""):
psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'"
......@@ -505,6 +506,7 @@ class TDDnodes:
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8")
if self.killValgrind == 1:
psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID):
......@@ -549,6 +551,9 @@ class TDDnodes:
def setValgrind(self, value):
self.valgrind = value
def setKillValgrind(self, value):
self.killValgrind = value
def deploy(self, index, *updatecfgDict):
self.sim.setTestCluster(self.testCluster)
......@@ -622,6 +627,7 @@ class TDDnodes:
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8")
if self.killValgrind == 1:
psCmd = "ps -ef|grep -w valgrind.bin| grep -v grep | awk '{print $2}'"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
while(processID):
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.ts = 1537146000000
self.param_list = ['LT','lt','Lt','lT','GT','gt','Gt','gT','LE','le','Le','lE','GE','ge','Ge','gE','NE','ne','Ne','nE','EQ','eq','Eq','eQ']
self.row_num = 10
def run(self):
tdSql.prepare()
# timestamp = 1ms , time_unit = 1s
tdSql.execute('''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
for i in range(self.row_num):
tdSql.execute("insert into test values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
integer_list = [1,2,3,4,11,12,13,14]
float_list = [5,6]
for i in integer_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5) from test")
tdSql.checkRows(10)
if j in ['LT' ,'lt','Lt','lT']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GT','gt', 'Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,)])
elif j in ['LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in [ 'GE','ge','Ge','gE']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,), (0,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (-1,), (0,), (0,), (0,), (0,), (0,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5) from test")
tdSql.checkRows(10)
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (0,), (0,), (0,), (0,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,), (0,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
error_column_list = ['ts','col7','col8','col9','a',1]
for i in error_column_list:
for j in self.param_list:
tdSql.error(f"select stateduration({i},{j},5) from test")
error_param_list = ['a',1]
for i in error_param_list:
tdSql.error(f"select stateduration(col1,{i},5) from test")
# timestamp = 1s, time_unit =1s
tdSql.execute('''create table test1(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
for i in range(self.row_num):
tdSql.execute("insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i*1000, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
for i in integer_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5) from test1")
tdSql.checkRows(10)
# print(tdSql.queryResult)
if j in ['LT' ,'lt','Lt','lT']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GT','gt', 'Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in [ 'GE','ge','Ge','gE']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5) from test1")
tdSql.checkRows(10)
print(tdSql.queryResult)
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
# timestamp = 1m, time_unit =1m
tdSql.execute('''create table test2(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
for i in range(self.row_num):
tdSql.execute("insert into test2 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i*1000*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
for i in integer_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1m) from test2")
tdSql.checkRows(10)
# print(tdSql.queryResult)
if j in ['LT' ,'lt','Lt','lT']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GT','gt', 'Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in [ 'GE','ge','Ge','gE']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1m) from test2")
tdSql.checkRows(10)
print(tdSql.queryResult)
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
# timestamp = 1h, time_unit =1h
tdSql.execute('''create table test3(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned)''')
for i in range(self.row_num):
tdSql.execute("insert into test3 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
for i in integer_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from test3")
tdSql.checkRows(10)
# print(tdSql.queryResult)
if j in ['LT' ,'lt','Lt','lT']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GT','gt', 'Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in [ 'GE','ge','Ge','gE']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from test3")
tdSql.checkRows(10)
print(tdSql.queryResult)
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
# timestamp = 1h,time_unit =1m
for i in integer_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1m) from test3")
tdSql.checkRows(10)
# print(tdSql.queryResult)
if j in ['LT' ,'lt','Lt','lT']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (60,), (120,), (180,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GT','gt', 'Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (60,), (120,), (180,), (240,)])
elif j in ['LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (60,), (120,), (180,), (240,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in [ 'GE','ge','Ge','gE']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (60,), (120,), (180,), (240,), (300,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (60,), (120,), (180,), (-1,), (0,), (60,), (120,), (180,), (240,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1m) from test3")
tdSql.checkRows(10)
print(tdSql.queryResult)
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (60,), (120,), (180,), (240,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (60,), (120,), (180,), (240,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (60,), (120,), (180,), (240,), (300,), (360,), (420,), (480,), (540,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
# for stb
tdSql.execute('''create table stb(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(t0 int)''')
tdSql.execute('create table stb_1 using stb tags(1)')
for i in range(self.row_num):
tdSql.execute("insert into stb_1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
% (self.ts + i*1000*60*60, i + 1, i + 1, i + 1, i + 1, i + 0.1, i + 0.1, i % 2, i + 1, i + 1, i + 1, i + 1, i + 1, i + 1))
for i in integer_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from stb")
tdSql.checkRows(10)
# print(tdSql.queryResult)
if j in ['LT' ,'lt','Lt','lT']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GT','gt', 'Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in [ 'GE','ge','Ge','gE']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,), (5,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (0,), (-1,), (-1,), (-1,), (-1,), (-1,)])
for i in float_list:
for j in self.param_list:
tdSql.query(f"select stateduration(col{i},'{j}',5,1h) from stb")
tdSql.checkRows(10)
print(tdSql.queryResult)
if j in ['LT','lt','Lt','lT','LE','le','Le','lE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (-1,), (-1,), (-1,), (-1,), (-1,)])
elif j in ['GE','ge','Ge','gE','GT','gt','Gt','gT']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (0,), (1,), (2,), (3,), (4,)])
elif j in ['NE','ne','Ne','nE']:
tdSql.checkEqual(tdSql.queryResult,[(0,), (1,), (2,), (3,), (4,), (5,), (6,), (7,), (8,), (9,)])
elif j in ['EQ','eq','Eq','eQ']:
tdSql.checkEqual(tdSql.queryResult,[(-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,), (-1,)])
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
\ No newline at end of file
......@@ -55,8 +55,8 @@ python3 ./test.py -f 2-query/Timediff.py
python3 ./test.py -f 2-query/top.py
python3 ./test.py -f 2-query/bottom.py
python3 ./test.py -f 2-query/percentile.py
python3 ./test.py -f 2-query/apercentile.py
python3 ./test.py -f 2-query/abs.py
python3 ./test.py -f 2-query/ceil.py
python3 ./test.py -f 2-query/floor.py
......@@ -83,6 +83,7 @@ python3 ./test.py -f 2-query/diff.py
python3 ./test.py -f 2-query/sample.py
python3 ./test.py -f 2-query/function_diff.py
python3 ./test.py -f 2-query/unique.py
python3 ./test.py -f 2-query/stateduration.py
python3 ./test.py -f 7-tmq/basic5.py
python3 ./test.py -f 7-tmq/subscribeDb.py
......
......@@ -37,6 +37,7 @@ if __name__ == "__main__":
masterIp = ""
testCluster = False
valgrind = 0
killValgrind = 1
logSql = True
stop = 0
restart = False
......@@ -45,8 +46,8 @@ if __name__ == "__main__":
windows = 1
updateCfgDict = {}
execCmd = ""
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:e:', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'execCmd'])
opts, args = getopt.gnu_getopt(sys.argv[1:], 'f:p:m:l:scghrd:k:e:', [
'file=', 'path=', 'master', 'logSql', 'stop', 'cluster', 'valgrind', 'help', 'restart', 'updateCfgDict', 'killv', 'execCmd'])
for key, value in opts:
if key in ['-h', '--help']:
tdLog.printNoPrefix(
......@@ -60,6 +61,7 @@ if __name__ == "__main__":
tdLog.printNoPrefix('-g valgrind Test Flag')
tdLog.printNoPrefix('-r taosd restart test')
tdLog.printNoPrefix('-d update cfg dict, base64 json str')
tdLog.printNoPrefix('-k not kill valgrind processer')
tdLog.printNoPrefix('-e eval str to run')
sys.exit(0)
......@@ -100,6 +102,9 @@ if __name__ == "__main__":
print('updateCfgDict convert fail.')
sys.exit(0)
if key in ['-k', '--killValgrind']:
killValgrind = 0
if key in ['-e', '--execCmd']:
try:
execCmd = base64.b64decode(value.encode()).decode()
......@@ -189,6 +194,7 @@ if __name__ == "__main__":
else:
tdCases.runAllWindows(conn)
else:
tdDnodes.setKillValgrind(killValgrind)
tdDnodes.init(deployPath, masterIp)
tdDnodes.setTestCluster(testCluster)
tdDnodes.setValgrind(valgrind)
......
......@@ -283,9 +283,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
tjsonAddStringToObject(item, "dbname", pObj->dbname);
tjsonAddIntegerToObject(item, "redoLogNum", taosArrayGetSize(pObj->redoLogs));
tjsonAddIntegerToObject(item, "undoLogNum", taosArrayGetSize(pObj->undoLogs));
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitLogs));
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions));
tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions));
tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册