提交 64556c9f 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

......@@ -66,11 +66,13 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq,
const char *opername);
void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendNullLog(STrans *pTrans);
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
......
......@@ -37,7 +37,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups);
int32_t mndAddPrepareNewVgAction(SMnode *, STrans *pTrans, SVgObj *pVg);
int32_t mndAddNewVgPrepareAction(SMnode *, STrans *pTrans, SVgObj *pVg);
int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid);
int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup);
int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType);
......
......@@ -37,6 +37,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw);
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb);
static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew);
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj);
static int32_t mndProcessCreateDbReq(SRpcMsg *pReq);
static int32_t mndProcessAlterDbReq(SRpcMsg *pReq);
static int32_t mndProcessDropDbReq(SRpcMsg *pReq);
......@@ -59,6 +61,7 @@ int32_t mndInitDb(SMnode *pMnode) {
.insertFp = (SdbInsertFp)mndDbActionInsert,
.updateFp = (SdbUpdateFp)mndDbActionUpdate,
.deleteFp = (SdbDeleteFp)mndDbActionDelete,
.validateFp = (SdbValidateFp)mndNewDbActionValidate,
};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq);
......@@ -247,6 +250,19 @@ _OVER:
return pRow;
}
static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) {
SDbObj *pNewDb = pObj;
SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name);
if (pOldDb != NULL) {
mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name);
sdbRelease(pMnode->pSdb, pOldDb);
return -1;
}
return 0;
}
static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) {
mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb);
return 0;
......@@ -448,9 +464,18 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE;
}
static int32_t mndSetPrepareNewVgActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
static int32_t mndSetCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL) return -1;
if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1;
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
return 0;
}
static int32_t mndSetNewVgPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) {
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
if (mndAddPrepareNewVgAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1;
if (mndAddNewVgPrepareAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1;
}
return 0;
}
......@@ -459,7 +484,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
SSdbRaw *pDbRaw = mndDbActionEncode(pDb);
if (pDbRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1;
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1;
if (sdbSetRawStatus(pDbRaw, SDB_STATUS_UPDATE) != 0) return -1;
for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) {
SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v);
......@@ -633,8 +658,8 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
mndTransSetOper(pTrans, MND_OPER_CREATE_DB);
if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj) != 0) goto _OVER;
if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER;
if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
......
......@@ -631,7 +631,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name);
if (mndAddPrepareNewVgAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
......
......@@ -17,7 +17,6 @@
#include "mndSync.h"
#include "mndCluster.h"
#include "mndTrans.h"
#include "mndVgroup.h"
static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
if (pMsg == NULL || pMsg->pCont == NULL) {
......@@ -75,25 +74,25 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
}
static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
SSdbRaw *pRaw = pAction->pRaw;
SSdb *pSdb = pMnode->pSdb;
SSdbRow *pRow = NULL;
int32_t code = -1;
void *pObj = NULL;
int code = -1;
if (pAction->msgType == TDMT_MND_CREATE_VG) {
pRow = mndVgroupActionDecode(pAction->pRaw);
if (pRow == NULL) goto _OUT;
if (pRaw->status != SDB_STATUS_CREATING) goto _OUT;
SVgObj *pVgroup = sdbGetRowObj(pRow);
if (pVgroup == NULL) goto _OUT;
pRow = (pSdb->decodeFps[pRaw->type])(pRaw);
if (pRow == NULL) goto _OUT;
pObj = sdbGetRowObj(pRow);
if (pObj == NULL) goto _OUT;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (maxVgId > pVgroup->vgId) {
mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId,
maxVgId);
goto _OUT;
}
SdbValidateFp validateFp = pSdb->validateFps[pRaw->type];
code = 0;
if (validateFp) {
code = validateFp(pMnode, pTrans, pObj);
}
code = 0;
_OUT:
taosMemoryFreeClear(pRow);
return code;
......
......@@ -654,11 +654,10 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
return mndTransAppendAction(pTrans->commitActions, &action);
}
int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction) {
pAction->stage = TRN_STAGE_PREPARE;
pAction->actionType = TRANS_ACTION_RAW;
pAction->mTraceId = pTrans->mTraceId;
return mndTransAppendAction(pTrans->prepareActions, pAction);
int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {
.pRaw = pRaw, .stage = TRN_STAGE_PREPARE, .actionType = TRANS_ACTION_RAW, .mTraceId = pTrans->mTraceId};
return mndTransAppendAction(pTrans->prepareActions, &action);
}
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
......
......@@ -33,6 +33,7 @@
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup);
static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew);
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj);
static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter);
......@@ -53,6 +54,7 @@ int32_t mndInitVgroup(SMnode *pMnode) {
.insertFp = (SdbInsertFp)mndVgroupActionInsert,
.updateFp = (SdbUpdateFp)mndVgroupActionUpdate,
.deleteFp = (SdbDeleteFp)mndVgroupActionDelete,
.validateFp = (SdbValidateFp)mndNewVgActionValidate,
};
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp);
......@@ -171,6 +173,17 @@ _OVER:
return pRow;
}
static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) {
SVgObj *pVgroup = pObj;
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
if (maxVgId > pVgroup->vgId) {
mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId);
return -1;
}
return 0;
}
static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) {
mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup);
return 0;
......@@ -1259,12 +1272,11 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb
return 0;
}
int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) {
SSdbRaw *pRaw = mndVgroupActionEncode(pVg);
if (pRaw == NULL) goto _err;
STransAction action = {.pRaw = pRaw, .msgType = TDMT_MND_CREATE_VG};
if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err;
if (mndTransAppendPrepareLog(pTrans, pRaw) != 0) goto _err;
(void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING);
pRaw = NULL;
return 0;
......@@ -2380,13 +2392,13 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP);
int32_t srcVgId = newVg1.vgId;
newVg1.vgId = maxVgId;
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg1) != 0) goto _OVER;
if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1) != 0) goto _OVER;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER;
maxVgId++;
srcVgId = newVg2.vgId;
newVg2.vgId = maxVgId;
if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg2) != 0) goto _OVER;
if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2) != 0) goto _OVER;
if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER;
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER;
......
......@@ -106,6 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj);
typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj);
typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc);
typedef int32_t (*SdbDeployFp)(SMnode *pMnode);
typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, void *pObj);
typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw);
typedef SSdbRaw *(*SdbEncodeFp)(void *pObj);
typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3);
......@@ -189,6 +190,7 @@ typedef struct SSdb {
SdbDeployFp deployFps[SDB_MAX];
SdbEncodeFp encodeFps[SDB_MAX];
SdbDecodeFp decodeFps[SDB_MAX];
SdbValidateFp validateFps[SDB_MAX];
TdThreadMutex filelock;
} SSdb;
......@@ -207,6 +209,7 @@ typedef struct {
SdbInsertFp insertFp;
SdbUpdateFp updateFp;
SdbDeleteFp deleteFp;
SdbValidateFp validateFp;
} SSdbTable;
typedef struct SSdbOpt {
......
......@@ -121,6 +121,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
pSdb->deployFps[sdbType] = table.deployFp;
pSdb->encodeFps[sdbType] = table.encodeFp;
pSdb->decodeFps[sdbType] = table.decodeFp;
pSdb->validateFps[sdbType] = table.validateFp;
int32_t hashType = 0;
if (keyType == SDB_KEY_INT32) {
......
......@@ -79,6 +79,8 @@ const char *sdbStatusName(ESdbStatus status) {
return "dropped";
case SDB_STATUS_INIT:
return "init";
case SDB_STATUS_UPDATE:
return "update";
default:
return "undefine";
}
......
......@@ -19,10 +19,22 @@
extern int vnodeScheduleTask(int (*execute)(void *), void *arg);
extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg);
extern void remove_file(const char *fname);
#define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT
#define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1)
typedef struct STFileHashEntry {
struct STFileHashEntry *next;
char fname[TSDB_FILENAME_LEN];
} STFileHashEntry;
typedef struct {
int32_t numFile;
int32_t numBucket;
STFileHashEntry **buckets;
} STFileHash;
enum {
TSDB_FS_STATE_NONE = 0,
TSDB_FS_STATE_OPEN,
......@@ -315,10 +327,8 @@ _exit:
}
// static int32_t
static int32_t apply_abort(STFileSystem *fs) {
// TODO
return 0;
}
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs);
static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); }
static int32_t abort_edit(STFileSystem *fs) {
char fname[TSDB_FILENAME_LEN];
......@@ -349,6 +359,180 @@ _exit:
return code;
}
static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
int32_t code = 0;
int32_t lino = 0;
// check file existence
if (!taosCheckExistFile(fobj->fname)) {
code = TSDB_CODE_FILE_CORRUPTED;
tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname);
return code;
}
{ // TODO: check file size
// int64_t fsize;
// if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
// code = TAOS_SYSTEM_ERROR(terrno);
// tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(fs->tsdb->pVnode), __func__,
// fobj->fname, tstrerror(code));
// return code;
// }
}
return 0;
}
static void tsdbFSDestroyFileObjHash(STFileHash *hash);
static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) {
STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry));
if (entry == NULL) return TSDB_CODE_OUT_OF_MEMORY;
strcpy(entry->fname, fname);
uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
entry->next = hash->buckets[idx];
hash->buckets[idx] = entry;
hash->numFile++;
return 0;
}
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
int32_t code = 0;
char fname[TSDB_FILENAME_LEN];
// init hash table
hash->numFile = 0;
hash->numBucket = 4096;
hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
if (hash->buckets == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
return code;
}
// vnode.json
current_fname(fs->tsdb, fname, TSDB_FCURRENT);
code = tsdbFSAddEntryToFileObjHash(hash, fname);
if (code) goto _exit;
// other
STFileSet *fset = NULL;
TARRAY2_FOREACH(fs->fSetArr, fset) {
// data file
for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
if (fset->farr[i] != NULL) {
code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
if (code) goto _exit;
}
}
// stt file
SSttLvl *lvl = NULL;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
STFileObj *fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
if (code) goto _exit;
}
}
}
_exit:
if (code) {
tsdbFSDestroyFileObjHash(hash);
}
return code;
}
static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) {
uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket;
STFileHashEntry *entry = hash->buckets[idx];
while (entry) {
if (strcmp(entry->fname, fname) == 0) {
return entry;
}
entry = entry->next;
}
return NULL;
}
static void tsdbFSDestroyFileObjHash(STFileHash *hash) {
for (int32_t i = 0; i < hash->numBucket; i++) {
STFileHashEntry *entry = hash->buckets[i];
while (entry) {
STFileHashEntry *next = entry->next;
taosMemoryFree(entry);
entry = next;
}
}
taosMemoryFree(hash->buckets);
memset(hash, 0, sizeof(*hash));
}
static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
int32_t code = 0;
int32_t lino = 0;
{ // scan each file
STFileSet *fset = NULL;
TARRAY2_FOREACH(fs->fSetArr, fset) {
// data file
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) {
if (fset->farr[ftype] == NULL) continue;
code = tsdbFSDoScanAndFixFile(fs, fset->farr[ftype]);
TSDB_CHECK_CODE(code, lino, _exit);
}
// stt file
SSttLvl *lvl;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
STFileObj *fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbFSDoScanAndFixFile(fs, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
}
{ // clear unreferenced files
STfsDir *dir = tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path);
if (dir == NULL) {
code = TAOS_SYSTEM_ERROR(terrno);
lino = __LINE__;
goto _exit;
}
STFileHash fobjHash = {0};
code = tsdbFSCreateFileObjHash(fs, &fobjHash);
if (code) goto _close_dir;
for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) {
if (taosIsDir(file->aname)) continue;
if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) {
remove_file(file->aname);
}
}
tsdbFSDestroyFileObjHash(&fobjHash);
_close_dir:
tfsClosedir(dir);
}
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
fs->neid = 0;
......@@ -356,8 +540,18 @@ static int32_t tsdbFSScanAndFix(STFileSystem *fs) {
const STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); }
// TODO
return 0;
// scan and fix
int32_t code = 0;
int32_t lino = 0;
code = tsdbFSDoSanAndFix(fs);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
}
return code;
}
static int32_t tsdbFSDupState(STFileSystem *fs) {
......
......@@ -41,7 +41,7 @@ static const struct {
[TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json},
};
static void remove_file(const char *fname) {
void remove_file(const char *fname) {
taosRemoveFile(fname);
tsdbInfo("file:%s is removed", fname);
}
......
......@@ -14,6 +14,7 @@
*/
#include "tsdb.h"
#include "vndCos.h"
/**
* @brief max key by precision
......@@ -76,9 +77,18 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) {
int32_t code = 0;
STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1;
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
int32_t size = taosArrayGetSize(pMsg->aSubmitTbData);
int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs);
if (nlevel > 1 && tsS3Enabled) {
if (nlevel == 3) {
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1;
} else if (nlevel == 2) {
minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep0;
}
}
for (int32_t i = 0; i < size; ++i) {
SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i);
......
......@@ -16,6 +16,7 @@
#include "tencode.h"
#include "tmsg.h"
#include "vnd.h"
#include "vndCos.h"
#include "vnode.h"
#include "vnodeInt.h"
......@@ -190,7 +191,18 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
} else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
now *= 1000000;
}
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2;
int32_t nlevel = tfsGetLevel(pVnode->pTfs);
int32_t keep = pVnode->config.tsdbCfg.keep2;
if (nlevel > 1 && tsS3Enabled) {
if (nlevel == 3) {
keep = pVnode->config.tsdbCfg.keep1;
} else if (nlevel == 2) {
keep = pVnode->config.tsdbCfg.keep0;
}
}
TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep;
TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision];
if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) {
uint64_t nColData;
......
......@@ -466,16 +466,16 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
taosThreadMutexInit(&pHandle->cfMutex, NULL);
pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
// rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create();
// int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2;
// rocksdb_env_set_low_priority_background_threads(env, nBGThread);
// rocksdb_env_set_high_priority_background_threads(env, nBGThread);
int32_t nBGThread = tsNumOfSnodeStreamThreads <= 2 ? 1 : tsNumOfSnodeStreamThreads / 2;
rocksdb_env_set_low_priority_background_threads(env, nBGThread);
rocksdb_env_set_high_priority_background_threads(env, nBGThread);
rocksdb_cache_t* cache = rocksdb_cache_create_lru(dbMemLimit / 2);
rocksdb_options_t* opts = rocksdb_options_create();
// rocksdb_options_set_env(opts, env);
rocksdb_options_set_env(opts, env);
rocksdb_options_set_create_if_missing(opts, 1);
rocksdb_options_set_create_missing_column_families(opts, 1);
rocksdb_options_set_max_total_wal_size(opts, dbMemLimit);
......@@ -484,8 +484,9 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
rocksdb_options_set_info_log_level(opts, 1);
rocksdb_options_set_db_write_buffer_size(opts, dbMemLimit);
rocksdb_options_set_write_buffer_size(opts, dbMemLimit / 2);
rocksdb_options_set_atomic_flush(opts, 1);
// pHandle->env = env;
pHandle->env = env;
pHandle->dbOpt = opts;
pHandle->cache = cache;
pHandle->filterFactory = rocksdb_compactionfilterfactory_create(
......@@ -520,7 +521,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId) {
_EXIT:
rocksdb_options_destroy(opts);
rocksdb_cache_destroy(cache);
// rocksdb_env_destroy(env);
rocksdb_env_destroy(env);
taosThreadMutexDestroy(&pHandle->mutex);
taosThreadMutexDestroy(&pHandle->cfMutex);
taosHashCleanup(pHandle->cfInst);
......@@ -543,20 +544,20 @@ void streamBackendCleanup(void* arg) {
taosHashCleanup(pHandle->cfInst);
if (pHandle->db) {
char* err = NULL;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
rocksdb_flush(pHandle->db, flushOpt, &err);
if (err != NULL) {
qError("failed to flush db before streamBackend clean up, reason:%s", err);
taosMemoryFree(err);
}
rocksdb_flushoptions_destroy(flushOpt);
// char* err = NULL;
// rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
// rocksdb_flushoptions_set_wait(flushOpt, 1);
// rocksdb_flush(pHandle->db, flushOpt, &err);
// if (err != NULL) {
// qError("failed to flush db before streamBackend clean up, reason:%s", err);
// taosMemoryFree(err);
// }
// rocksdb_flushoptions_destroy(flushOpt);
rocksdb_close(pHandle->db);
}
rocksdb_options_destroy(pHandle->dbOpt);
// rocksdb_env_destroy(pHandle->env);
rocksdb_env_destroy(pHandle->env);
rocksdb_cache_destroy(pHandle->cache);
SListNode* head = tdListPopHead(pHandle->list);
......@@ -616,6 +617,7 @@ void streamBackendHandleCleanup(void* arg) {
}
}
taosMemoryFreeClear(wrapper->pHandle);
for (int i = 0; i < cfLen; i++) {
rocksdb_options_destroy(wrapper->cfOpts[i]);
rocksdb_block_based_options_destroy(((RocksdbCfParam*)wrapper->param)[i].tableOpt);
......@@ -789,8 +791,10 @@ int32_t chkpGetAllDbCfHandle(SStreamMeta* pMeta, rocksdb_column_family_handle_t*
rocksdb_column_family_handle_t* p = wrapper->pHandle[i];
size_t len = 0;
char* name = rocksdb_column_family_handle_get_name(p, &len);
qError("column name: name: %d", (int)len);
taosMemoryFree(name);
// char buf[64] = {0};
// memcpy(buf, name, len);
// qError("column name: name: %s, len: %d", buf, (int)len);
// taosMemoryFree(name);
taosArrayPush(pHandle, &p);
}
......@@ -834,8 +838,9 @@ _ERROR:
return code;
}
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
int code = -1;
int code = 0;
char* err = NULL;
rocksdb_flushoptions_t* flushOpt = rocksdb_flushoptions_create();
rocksdb_flushoptions_set_wait(flushOpt, 1);
......@@ -843,8 +848,8 @@ int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32
if (err != NULL) {
qError("failed to flush db before streamBackend clean up, reason:%s", err);
taosMemoryFree(err);
code = -1;
}
code = 0;
rocksdb_flushoptions_destroy(flushOpt);
return code;
}
......@@ -896,11 +901,10 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
}
// Get all cf and acquire cfWrappter
int32_t nCf = 0; // chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
int32_t nCf = chkpGetAllDbCfHandle(pMeta, &ppCf, refs);
qDebug("stream backend:%p start to do checkpoint at:%s, cf num: %d ", pHandle, pChkpIdDir, 0);
// code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
code = 0;
code = chkpPreFlushDb(pHandle->db, ppCf, nCf);
if (code == 0) {
code = chkpDoDbCheckpoint(pHandle->db, pChkpIdDir);
if (code != 0) {
......@@ -913,10 +917,10 @@ int32_t streamBackendDoCheckpoint(void* arg, uint64_t checkpointId) {
qError("stream backend:%p failed to flush db at:%s", pHandle, pChkpIdDir);
}
// release all ref to cfWrapper;
// for (int i = 0; i < taosArrayGetSize(refs); i++) {
// int64_t id = *(int64_t*)taosArrayGet(refs, i);
// taosReleaseRef(streamBackendCfWrapperId, id);
// }
for (int i = 0; i < taosArrayGetSize(refs); i++) {
int64_t id = *(int64_t*)taosArrayGet(refs, i);
taosReleaseRef(streamBackendCfWrapperId, id);
}
if (code == 0) {
taosWLockLatch(&pMeta->chkpDirLock);
taosArrayPush(pMeta->chkpSaved, &checkpointId);
......@@ -964,7 +968,8 @@ static rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const cha
rocksdb_snapshot_t** snapshot, rocksdb_readoptions_t** readOpt);
int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) {
int ret = memcmp(aBuf, bBuf, aLen);
int len = aLen < bLen ? aLen : bLen;
int ret = memcmp(aBuf, bBuf, len);
if (ret == 0) {
if (aLen < bLen)
return -1;
......@@ -1474,7 +1479,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
inst->pCompares = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
inst->dbOpt = handle->dbOpt;
// rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
rocksdb_writeoptions_disable_WAL(inst->wOpt, 1);
taosHashPut(handle->cfInst, idstr, strlen(idstr) + 1, &inst, sizeof(void*));
} else {
inst = *pInst;
......@@ -1595,7 +1600,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
taosThreadRwlockInit(&pBackendCfWrapper->rwLock, NULL);
SCfComparator compare = {.comp = pCompare, .numOfComp = cfLen};
pBackendCfWrapper->pComparNode = streamBackendAddCompare(handle, &compare);
// rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
rocksdb_writeoptions_disable_WAL(pBackendCfWrapper->writeOpts, 1);
memcpy(pBackendCfWrapper->idstr, pState->pTdbState->idstr, sizeof(pState->pTdbState->idstr));
int64_t id = taosAddRef(streamBackendCfWrapperId, pBackendCfWrapper);
......
......@@ -132,6 +132,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5
......
......@@ -5,8 +5,10 @@ parm_path=$(pwd ${parm_path})
echo "execute path:${parm_path}"
cd ${parm_path}
cp cases.task ${case_file}
# comment udf and stream case in windows
sed -i '/udf/d' ${case_file}
sed -i '/Udf/d' ${case_file}
sed -i '/stream/d' ${case_file}
sed -i '/^$/d' ${case_file}
sed -i '$a\%%FINISHED%%' ${case_file}
......
......@@ -812,6 +812,15 @@ class TDDnodes:
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8").strip()
psCmd = "for /f %a in ('wmic process where \"name='tmq_sim'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
while(processID):
print(processID)
killCmd = "kill -9 %s > nul 2>&1" % processID
os.system(killCmd)
time.sleep(1)
processID = subprocess.check_output(
psCmd, shell=True).decode("utf-8").strip()
else:
psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs"
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip()
......
......@@ -26,8 +26,10 @@
./test.sh -f tsim/user/basic.sim
./test.sh -f tsim/user/password.sim
./test.sh -f tsim/user/privilege_db.sim
./test.sh -f tsim/user/privilege_sysinfo.sim
./test.sh -f tsim/user/privilege_topic.sim
./test.sh -f tsim/user/privilege_table.sim
./test.sh -f tsim/user/privilege_create_db.sim
./test.sh -f tsim/db/alter_option.sim
./test.sh -f tsim/db/alter_replica_31.sim
./test.sh -f tsim/db/basic1.sim
......@@ -183,6 +185,7 @@
./test.sh -f tsim/query/scalarNull.sim
./test.sh -f tsim/query/session.sim
./test.sh -f tsim/query/join_interval.sim
./test.sh -f tsim/query/join_pk.sim
./test.sh -f tsim/query/unionall_as_table.sim
./test.sh -f tsim/query/multi_order_by.sim
./test.sh -f tsim/query/sys_tbname.sim
......@@ -197,6 +200,7 @@
./test.sh -f tsim/query/tag_scan.sim
./test.sh -f tsim/query/nullColSma.sim
./test.sh -f tsim/query/bug3398.sim
./test.sh -f tsim/query/explain_tsorder.sim
./test.sh -f tsim/qnode/basic1.sim
./test.sh -f tsim/snode/basic1.sim
./test.sh -f tsim/mnode/basic1.sim
......
......@@ -226,7 +226,7 @@ class TDTestCase:
# init
def init(self, conn, logSql, replicaVar=1):
seed = time.clock_gettime(time.CLOCK_REALTIME)
seed = time.time() % 10000
random.seed(seed)
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
......
......@@ -93,19 +93,20 @@ class TDTestCase:
cfgPath = tdCom.getClientCfgPath()
taosLogFile = '%s/../log/taoslog*'%(cfgPath)
filterResultFile = '%s/../log/filter'%(cfgPath)
cmdStr = 'grep "process poll rsp, vgId:" %s >> %s'%(taosLogFile, filterResultFile)
cmdStr = 'grep -h "process poll rsp, vgId:" %s >> %s'%(taosLogFile, filterResultFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumerDict = {}
for index, line in enumerate(open(filterResultFile,'r')):
# tdLog.info("row[%d]: %s"%(index, line))
valueList = line.split(',')
# for i in range(len(valueList)):
# tdLog.info("index[%d]: %s"%(i, valueList[i]))
# get consumer id
list2 = valueList[0].split(':')
list3 = list2[4].split()
list3 = list2[3].split()
consumerId = list3[0]
print("consumerId: %s"%(consumerId))
......
import sys
import re
import time
import threading
from taos.tmq import *
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
self.db_name = "tmq_db"
self.topic_name = "tmq_topic"
self.stable_name = "stb"
self.rows_per_table = 1000
self.ctb_num = 100
def prepareData(self, precisionUnit="ms"):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
startTS = 1672502400000
if precisionUnit == "us":
startTS = 1672502400000000
elif precisionUnit == "ns":
startTS = 1672502400000000000
paraDict = {
'dbName': self.db_name,
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': self.stable_name,
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': self.ctb_num,
'rowsPerTbl': self.rows_per_table,
'batchNum': 100,
'startTs': startTS, # 2023-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0
}
# init the consumer database
tmqCom.initConsumerTable()
# create testing database、stable、ctables
tdCom.create_database(tdSql, paraDict["dbName"], paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, precision=precisionUnit)
tdLog.info("create database %s successfully" % paraDict["dbName"])
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"], stbName=paraDict["stbName"])
tdLog.info("create stable %s successfully" % paraDict["stbName"])
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"], ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"], ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create child tables successfully")
# insert data into tables and wait the async thread exit
tdLog.info("insert data into tables")
pThread = tmqCom.asyncInsertDataByInterlace(paraDict)
pThread.join()
def run(self):
"""Check tmq feature for different data precision unit like "ms、us、ns"
"""
precision_unit = ["ms", "us", "ns"]
for unit in precision_unit:
tdLog.info(f"start to test precision unit {unit}")
self.prepareData(precisionUnit=unit)
# drop database if exists
tdSql.execute(f"drop database if exists {self.db_name}")
self.prepareData(unit)
# create topic
tdLog.info("create topic from %s" % self.stable_name)
queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(self.db_name, self.stable_name)
sqlString = "create topic %s as %s" %(self.topic_name, queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# save consumer info
consumerId = 0
expectrowcnt = self.rows_per_table * self.ctb_num
topicList = self.topic_name
ifcheckdata = 0
ifManualCommit = 0
keyList = 'group.id:cgrp1,\
enable.auto.commit:false,\
auto.commit.interval.ms:6000,\
auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt, topicList, keyList, ifcheckdata, ifManualCommit)
# start consume processor
paraDict = {
'pollDelay': 15,
'showMsg': 1,
'showRow': 1,
'snapshot': 0
}
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'], dbName=self.db_name, showMsg=paraDict['showMsg'], showRow=paraDict['showRow'], snapshot=paraDict['snapshot'])
tdLog.info("start to check consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
totalConsumeRows = 0
for i in range(expectRows):
totalConsumeRows += resultList[i]
tdSql.query(queryString)
totalRowsFromQuery = tdSql.getRows()
tdLog.info("act consume rows: %d, act query rows: %d "%(totalConsumeRows, totalRowsFromQuery))
if totalConsumeRows < totalRowsFromQuery:
tdLog.exit("tmq consume rows error!")
tmqCom.waitSubscriptionExit(tdSql, self.topic_name)
tdSql.query("drop topic %s" % self.topic_name)
tdSql.execute("drop database %s" % self.db_name)
def stop(self):
tdSql.execute(f"drop database if exists {self.db_name}")
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
......@@ -199,7 +199,7 @@ if __name__ == "__main__":
createDnodeNums = value
if key in ['-i', '--independentMnode']:
independentMnode = value
independentMnode = False
if key in ['-R', '--restful']:
restful = True
......@@ -553,6 +553,7 @@ if __name__ == "__main__":
else :
# dnode > 1 cluster
tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums))
print(independentMnode,"independentMnode valuse")
dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode)
tdDnodes = ClusterDnodes(dnodeslist)
tdDnodes.init(deployPath, masterIp)
......
......@@ -17,6 +17,7 @@ python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4
python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
python3 ./test.py -f 7-tmq/tmqShow.py
python3 ./test.py -f 7-tmq/tmqDropStb.py
python3 ./test.py -f 7-tmq/subscribeStb0.py
......@@ -133,6 +134,8 @@ python3 ./test.py -f 0-others/sysinfo.py
python3 ./test.py -f 0-others/user_control.py
python3 ./test.py -f 0-others/user_manage.py
python3 ./test.py -f 0-others/user_privilege.py
python3 ./test.py -f 0-others/user_privilege_show.py
python3 ./test.py -f 0-others/user_privilege_all.py
python3 ./test.py -f 0-others/fsync.py
python3 ./test.py -f 0-others/multilevel.py
python3 ./test.py -f 0-others/compatibility.py
......@@ -421,6 +424,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3
python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3
python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3
python3 ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1
python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 6 -M 3
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册