提交 9a0d27c6 编写于 作者: S Shengliang Guan

TD-10431 test for create user

上级 2348b6e2
...@@ -379,41 +379,41 @@ TEST_F(DndTestDnode, RestartDnode_01) { ...@@ -379,41 +379,41 @@ TEST_F(DndTestDnode, RestartDnode_01) {
const char* fqdn = "localhost"; const char* fqdn = "localhost";
const char* firstEp = "localhost:9521"; const char* firstEp = "localhost:9521";
pServer1 = startServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp); pServer1 = startServer("/tmp/dndTestDnode1", fqdn, 9521, firstEp);
// pServer1 = startServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp); pServer3 = startServer("/tmp/dndTestDnode3", fqdn, 9523, firstEp);
// pServer1 = startServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp); pServer4 = startServer("/tmp/dndTestDnode4", fqdn, 9524, firstEp);
// pServer1 = startServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp); pServer5 = startServer("/tmp/dndTestDnode5", fqdn, 9525, firstEp);
uInfo("all server is running"); uInfo("all server is running");
// taosMsleep(1300); taosMsleep(1300);
// SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7); SendTheCheckShowMetaMsg(TSDB_MGMT_TABLE_DNODE, "show dnodes", 7);
// SendThenCheckShowRetrieveMsg(4); SendThenCheckShowRetrieveMsg(4);
// CheckInt16(1); CheckInt16(1);
// CheckInt16(3); CheckInt16(3);
// CheckInt16(4); CheckInt16(4);
// CheckInt16(5); CheckInt16(5);
// CheckBinary("localhost:9521", TSDB_EP_LEN); CheckBinary("localhost:9521", TSDB_EP_LEN);
// CheckBinary("localhost:9523", TSDB_EP_LEN); CheckBinary("localhost:9523", TSDB_EP_LEN);
// CheckBinary("localhost:9524", TSDB_EP_LEN); CheckBinary("localhost:9524", TSDB_EP_LEN);
// CheckBinary("localhost:9525", TSDB_EP_LEN); CheckBinary("localhost:9525", TSDB_EP_LEN);
// CheckInt16(0); CheckInt16(0);
// CheckInt16(0); CheckInt16(0);
// CheckInt16(0); CheckInt16(0);
// CheckInt16(0); CheckInt16(0);
// CheckInt16(1); CheckInt16(1);
// CheckInt16(1); CheckInt16(1);
// CheckInt16(1); CheckInt16(1);
// CheckInt16(1); CheckInt16(1);
// CheckBinary("ready", 10); CheckBinary("ready", 10);
// CheckBinary("ready", 10); CheckBinary("ready", 10);
// CheckBinary("ready", 10); CheckBinary("ready", 10);
// CheckBinary("ready", 10); CheckBinary("ready", 10);
// CheckTimestamp(); CheckTimestamp();
// CheckTimestamp(); CheckTimestamp();
// CheckTimestamp(); CheckTimestamp();
// CheckTimestamp(); CheckTimestamp();
// CheckBinary("", 24); CheckBinary("", 24);
// CheckBinary("", 24); CheckBinary("", 24);
// CheckBinary("", 24); CheckBinary("", 24);
// CheckBinary("", 24); CheckBinary("", 24);
} }
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
void initLog(const char* path) { void initLog(const char* path) {
dDebugFlag = 143; dDebugFlag = 143;
vDebugFlag = 0; vDebugFlag = 0;
mDebugFlag = 143; mDebugFlag = 207;
cDebugFlag = 0; cDebugFlag = 0;
jniDebugFlag = 0; jniDebugFlag = 0;
tmrDebugFlag = 0; tmrDebugFlag = 0;
......
...@@ -73,7 +73,8 @@ typedef enum { ...@@ -73,7 +73,8 @@ typedef enum {
TRN_STAGE_EXECUTE = 2, TRN_STAGE_EXECUTE = 2,
TRN_STAGE_COMMIT = 3, TRN_STAGE_COMMIT = 3,
TRN_STAGE_ROLLBACK = 4, TRN_STAGE_ROLLBACK = 4,
TRN_STAGE_RETRY = 5 TRN_STAGE_RETRY = 5,
TRN_STAGE_OVER = 6,
} ETrnStage; } ETrnStage;
typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy; typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy;
......
...@@ -58,7 +58,7 @@ int32_t mndInitTrans(SMnode *pMnode) { ...@@ -58,7 +58,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
void mndCleanupTrans(SMnode *pMnode) {} void mndCleanupTrans(SMnode *pMnode) {}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) { static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t rawDataLen = 16 * sizeof(int32_t); int32_t rawDataLen = 16 * sizeof(int32_t) + TSDB_TRN_RESERVE_SIZE;
int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
...@@ -88,7 +88,6 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -88,7 +88,6 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pTrans->id) SDB_SET_INT32(pRaw, dataPos, pTrans->id)
SDB_SET_INT8(pRaw, dataPos, pTrans->stage)
SDB_SET_INT8(pRaw, dataPos, pTrans->policy) SDB_SET_INT8(pRaw, dataPos, pTrans->policy)
SDB_SET_INT32(pRaw, dataPos, redoLogNum) SDB_SET_INT32(pRaw, dataPos, redoLogNum)
SDB_SET_INT32(pRaw, dataPos, undoLogNum) SDB_SET_INT32(pRaw, dataPos, undoLogNum)
...@@ -166,7 +165,6 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -166,7 +165,6 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id) SDB_GET_INT32(pRaw, pRow, dataPos, &pTrans->id)
SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->stage)
SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->policy) SDB_GET_INT8(pRaw, pRow, dataPos, (int8_t *)&pTrans->policy)
SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum) SDB_GET_INT32(pRaw, pRow, dataPos, &redoLogNum)
SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum) SDB_GET_INT32(pRaw, pRow, dataPos, &undoLogNum)
...@@ -231,6 +229,7 @@ TRANS_DECODE_OVER: ...@@ -231,6 +229,7 @@ TRANS_DECODE_OVER:
} }
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
pTrans->stage = TRN_STAGE_PREPARE;
mTrace("trans:%d, perform insert action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); mTrace("trans:%d, perform insert action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage));
return 0; return 0;
} }
...@@ -263,7 +262,7 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { ...@@ -263,7 +262,7 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
sdbRelease(pSdb, pTrans); sdbRelease(pSdb, pTrans);
} }
static int32_t trnGenerateTransId() { static int32_t mndGenerateTransId() {
static int32_t tmp = 0; static int32_t tmp = 0;
return ++tmp; return ++tmp;
} }
...@@ -304,7 +303,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { ...@@ -304,7 +303,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
return NULL; return NULL;
} }
pTrans->id = trnGenerateTransId(); pTrans->id = mndGenerateTransId();
pTrans->stage = TRN_STAGE_PREPARE; pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->rpcHandle = rpcHandle; pTrans->rpcHandle = rpcHandle;
...@@ -428,6 +427,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { ...@@ -428,6 +427,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
} }
mDebug("trans:%d, prepare finished", pNewTrans->id); mDebug("trans:%d, prepare finished", pNewTrans->id);
pNewTrans->rpcHandle = pTrans->rpcHandle;
mndTransExecute(pMnode, pNewTrans); mndTransExecute(pMnode, pNewTrans);
mndReleaseTrans(pMnode, pNewTrans); mndReleaseTrans(pMnode, pNewTrans);
return 0; return 0;
...@@ -443,19 +443,21 @@ int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { ...@@ -443,19 +443,21 @@ int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
} }
sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED);
mTrace("trans:%d, start sync", pTrans->id); if (taosArrayGetSize(pTrans->commitLogs) != 0) {
int32_t code = mndSyncPropose(pMnode, pRaw); mTrace("trans:%d, start sync", pTrans->id);
if (code != 0) { int32_t code = mndSyncPropose(pMnode, pRaw);
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); if (code != 0) {
sdbFreeRaw(pRaw); mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
return -1; sdbFreeRaw(pRaw);
} return -1;
}
mTrace("trans:%d, sync finished", pTrans->id); mTrace("trans:%d, sync finished", pTrans->id);
code = sdbWrite(pMnode->pSdb, pRaw); code = sdbWrite(pMnode->pSdb, pRaw);
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr());
return -1; return -1;
}
} }
mDebug("trans:%d, commit finished", pTrans->id); mDebug("trans:%d, commit finished", pTrans->id);
...@@ -521,45 +523,58 @@ static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) { ...@@ -521,45 +523,58 @@ static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) {
} }
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteArray(pMnode, pTrans->redoLogs); int32_t code = 0;
if (code != 0) { if (taosArrayGetSize(pTrans->redoLogs) != 0) {
mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr()) code = mndTransExecuteArray(pMnode, pTrans->redoLogs);
} else { if (code != 0) {
mTrace("trans:%d, execute redo logs finished", pTrans->id) mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr())
} else {
mTrace("trans:%d, execute redo logs finished", pTrans->id)
}
} }
return code; return code;
} }
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteArray(pMnode, pTrans->undoLogs); int32_t code = 0;
if (code != 0) { if (taosArrayGetSize(pTrans->undoLogs) != 0) {
mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr()) code = mndTransExecuteArray(pMnode, pTrans->undoLogs);
} else { if (code != 0) {
mTrace("trans:%d, execute undo logs finished", pTrans->id) mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr())
} else {
mTrace("trans:%d, execute undo logs finished", pTrans->id)
}
} }
return code; return code;
} }
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteArray(pMnode, pTrans->commitLogs); int32_t code = 0;
if (code != 0) { if (taosArrayGetSize(pTrans->commitLogs) != 0) {
mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr()) code = mndTransExecuteArray(pMnode, pTrans->commitLogs);
} else { if (code != 0) {
mTrace("trans:%d, execute commit logs finished", pTrans->id) mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr())
} else {
mTrace("trans:%d, execute commit logs finished", pTrans->id)
}
} }
return code; return code;
} }
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) {
mTrace("trans:%d, execute redo actions finished", pTrans->id); if (taosArrayGetSize(pTrans->redoActions) != 0) {
mTrace("trans:%d, execute redo actions finished", pTrans->id);
}
return 0; return 0;
} }
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
mTrace("trans:%d, execute undo actions finished", pTrans->id); if (taosArrayGetSize(pTrans->undoActions) != 0) {
mTrace("trans:%d, execute undo actions finished", pTrans->id);
}
return 0; return 0;
} }
...@@ -603,7 +618,7 @@ static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { ...@@ -603,7 +618,7 @@ static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans); int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);
if (code == 0) { if (code == 0) {
pTrans->stage = TRN_STAGE_COMMIT; pTrans->stage = TRN_STAGE_OVER;
mTrace("trans:%d, commit stage finished", pTrans->id); mTrace("trans:%d, commit stage finished", pTrans->id);
} else { } else {
if (pTrans->policy == TRN_POLICY_ROLLBACK) { if (pTrans->policy == TRN_POLICY_ROLLBACK) {
...@@ -671,6 +686,9 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { ...@@ -671,6 +686,9 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
case TRN_STAGE_RETRY: case TRN_STAGE_RETRY:
code = mndTransPerformRetryStage(pMnode, pTrans); code = mndTransPerformRetryStage(pMnode, pTrans);
break; break;
default:
mndTransSendRpcRsp(pTrans, 0);
return;
} }
} }
......
...@@ -217,23 +217,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, ...@@ -217,23 +217,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
} }
sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
SSdbRaw *pUndoRaw = mndUserActionEncode(&userObj);
if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) {
mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED);
SSdbRaw *pCommitRaw = mndUserActionEncode(&userObj);
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
......
...@@ -132,11 +132,6 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow * ...@@ -132,11 +132,6 @@ static int32_t sdbDeleteRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
taosHashRemove(hash, pOldRow->pObj, keySize); taosHashRemove(hash, pOldRow->pObj, keySize);
taosWUnLockLatch(pLock); taosWUnLockLatch(pLock);
SdbDeleteFp deleteFp = pSdb->deleteFps[pOldRow->type];
if (deleteFp != NULL) {
code = (*deleteFp)(pSdb, pOldRow->pObj);
}
sdbRelease(pSdb, pOldRow->pObj); sdbRelease(pSdb, pOldRow->pObj);
sdbFreeRow(pRow); sdbFreeRow(pRow);
return code; return code;
...@@ -161,6 +156,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) { ...@@ -161,6 +156,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw) {
case SDB_STATUS_CREATING: case SDB_STATUS_CREATING:
code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize); code = sdbInsertRow(pSdb, hash, pRaw, pRow, keySize);
break; break;
case SDB_STATUS_UPDATING:
case SDB_STATUS_READY: case SDB_STATUS_READY:
case SDB_STATUS_DROPPING: case SDB_STATUS_DROPPING:
code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize); code = sdbUpdateRow(pSdb, hash, pRaw, pRow, keySize);
...@@ -228,6 +224,11 @@ void sdbRelease(SSdb *pSdb, void *pObj) { ...@@ -228,6 +224,11 @@ void sdbRelease(SSdb *pSdb, void *pObj) {
int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1); int32_t ref = atomic_sub_fetch_32(&pRow->refCount, 1);
if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) { if (ref <= 0 && pRow->status == SDB_STATUS_DROPPED) {
SdbDeleteFp deleteFp = pSdb->deleteFps[pRow->type];
if (deleteFp != NULL) {
(*deleteFp)(pSdb, pRow->pObj);
}
sdbFreeRow(pRow); sdbFreeRow(pRow);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册