提交 dd759783 编写于 作者: sangshuduo's avatar sangshuduo

Merge branch '3.0' into feat/sangshuduo/TD-14141-update-taostools-for3.0

...@@ -227,8 +227,8 @@ static const SSysDbTableSchema transSchema[] = { ...@@ -227,8 +227,8 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stage", .bytes = TSDB_TRANS_STAGE_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db1", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "db2", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, {.name = "stable", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, {.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, {.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
{.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
......
...@@ -165,8 +165,8 @@ typedef struct { ...@@ -165,8 +165,8 @@ typedef struct {
SEpSet lastEpset; SEpSet lastEpset;
tmsg_t lastMsgType; tmsg_t lastMsgType;
tmsg_t originRpcType; tmsg_t originRpcType;
char dbname1[TSDB_TABLE_FNAME_LEN]; char dbname[TSDB_TABLE_FNAME_LEN];
char dbname2[TSDB_TABLE_FNAME_LEN]; char stbname[TSDB_TABLE_FNAME_LEN];
int32_t startFunc; int32_t startFunc;
int32_t stopFunc; int32_t stopFunc;
int32_t paramLen; int32_t paramLen;
......
...@@ -71,7 +71,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); ...@@ -71,7 +71,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2); void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname);
void mndTransSetSerial(STrans *pTrans); void mndTransSetSerial(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper); void mndTransSetOper(STrans *pTrans, EOperType oper);
......
...@@ -671,7 +671,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { ...@@ -671,7 +671,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr()); mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER; goto _OVER;
} }
mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); mndTransSetDbName(pTrans, createStreamReq.sourceDB, streamObj.targetDb); // hack way
mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name); mDebug("trans:%d, used to create stream:%s", pTrans->id, createStreamReq.name);
// create stb for stream // create stb for stream
......
...@@ -127,8 +127,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { ...@@ -127,8 +127,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
SDB_SET_INT8(pRaw, dataPos, 0, _OVER) SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->originRpcType, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->originRpcType, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER)
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
...@@ -289,8 +289,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { ...@@ -289,8 +289,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans->oper = oper; pTrans->oper = oper;
SDB_GET_INT16(pRaw, dataPos, &pTrans->originRpcType, _OVER) SDB_GET_INT16(pRaw, dataPos, &pTrans->originRpcType, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
...@@ -706,7 +706,7 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c ...@@ -706,7 +706,7 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
if (pIter == NULL) break; if (pIter == NULL) break;
if (pTrans->oper == oper) { if (pTrans->oper == oper) {
if (strcasecmp(dbname, pTrans->dbname1) == 0) { if (strcasecmp(dbname, pTrans->dbname) == 0) {
mInfo("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper); mInfo("trans:%d, db:%s oper:%d matched with input", pTrans->id, dbname, oper);
if (pTrans->pRpcArray == NULL) { if (pTrans->pRpcArray == NULL) {
pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo)); pTrans->pRpcArray = taosArrayInit(1, sizeof(SRpcHandleInfo));
...@@ -725,12 +725,12 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c ...@@ -725,12 +725,12 @@ int32_t mndSetRpcInfoForDbTrans(SMnode *pMnode, SRpcMsg *pMsg, EOperType oper, c
return code; return code;
} }
void mndTransSetDbName(STrans *pTrans, const char *dbname1, const char *dbname2) { void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbname) {
if (dbname1 != NULL) { if (dbname != NULL) {
tstrncpy(pTrans->dbname1, dbname1, TSDB_TABLE_FNAME_LEN); tstrncpy(pTrans->dbname, dbname, TSDB_TABLE_FNAME_LEN);
} }
if (dbname2 != NULL) { if (stbname != NULL) {
tstrncpy(pTrans->dbname2, dbname2, TSDB_TABLE_FNAME_LEN); tstrncpy(pTrans->stbname, stbname, TSDB_TABLE_FNAME_LEN);
} }
} }
...@@ -759,9 +759,9 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { ...@@ -759,9 +759,9 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
return 0; return 0;
} }
static bool mndCheckDbConflict(const char *db, STrans *pTrans) { static bool mndCheckDbConflict(const char *conflict, STrans *pTrans) {
if (db[0] == 0) return false; if (conflict[0] == 0) return false;
if (strcasecmp(db, pTrans->dbname1) == 0 || strcasecmp(db, pTrans->dbname2) == 0) return true; if (strcasecmp(conflict, pTrans->dbname) == 0 || strcasecmp(conflict, pTrans->stbname) == 0) return true;
return false; return false;
} }
...@@ -780,28 +780,28 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { ...@@ -780,28 +780,28 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (pNew->conflict == TRN_CONFLICT_DB) { if (pNew->conflict == TRN_CONFLICT_DB) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (mndCheckDbConflict(pNew->dbname1, pTrans)) conflict = true; if (mndCheckDbConflict(pNew->dbname, pTrans)) conflict = true;
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true; if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
} }
} }
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) { if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB) { if (pTrans->conflict == TRN_CONFLICT_DB) {
if (mndCheckDbConflict(pNew->dbname1, pTrans)) conflict = true; if (mndCheckDbConflict(pNew->dbname, pTrans)) conflict = true;
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true; if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true;
} }
if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (mndCheckDbConflict(pNew->dbname2, pTrans)) conflict = true; // for stb if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true; // for stb
} }
} }
if (conflict) { if (conflict) {
mError("trans:%d, db1:%s db2:%s type:%d, can't execute since conflict with trans:%d db1:%s db2:%s type:%d", mError("trans:%d, db:%s stb:%s type:%d, can't execute since conflict with trans:%d db:%s stb:%s type:%d",
pNew->id, pNew->dbname1, pNew->dbname2, pNew->conflict, pTrans->id, pTrans->dbname1, pTrans->dbname2, pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
pTrans->conflict); pTrans->conflict);
} else { } else {
mDebug("trans:%d, db1:%s db2:%s type:%d, not conflict with trans:%d db1:%s db2:%s type:%d", pNew->id, mDebug("trans:%d, db:%s stb:%s type:%d, not conflict with trans:%d db:%s stb:%s type:%d", pNew->id,
pNew->dbname1, pNew->dbname2, pNew->conflict, pTrans->id, pTrans->dbname1, pTrans->dbname2, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname,
pTrans->conflict); pTrans->conflict);
} }
sdbRelease(pMnode->pSdb, pTrans); sdbRelease(pMnode->pSdb, pTrans);
...@@ -812,7 +812,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { ...@@ -812,7 +812,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (strlen(pTrans->dbname1) == 0 && strlen(pTrans->dbname2) == 0) { if (strlen(pTrans->dbname) == 0 && strlen(pTrans->stbname) == 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT; terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("trans:%d, failed to prepare conflict db not set", pTrans->id); mError("trans:%d, failed to prepare conflict db not set", pTrans->id);
return -1; return -1;
...@@ -913,12 +913,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -913,12 +913,12 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
if (pTrans->originRpcType == TDMT_MND_CREATE_DB) { if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
mInfo("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType)); mInfo("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1); SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname);
if (pDb != NULL) { if (pDb != NULL) {
for (int32_t j = 0; j < 12; j++) { for (int32_t j = 0; j < 12; j++) {
bool ready = mndIsDbReady(pMnode, pDb); bool ready = mndIsDbReady(pMnode, pDb);
if (!ready) { if (!ready) {
mInfo("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname1, j); mInfo("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname, j);
taosMsleep(1000); taosMsleep(1000);
} else { } else {
break; break;
...@@ -929,7 +929,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { ...@@ -929,7 +929,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
} else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) { } else if (pTrans->originRpcType == TDMT_MND_CREATE_STB) {
void *pCont = NULL; void *pCont = NULL;
int32_t contLen = 0; int32_t contLen = 0;
if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname1, pTrans->dbname2, &pCont, &contLen) != 0) { if (0 == mndBuildSMCreateStbRsp(pMnode, pTrans->dbname, pTrans->stbname, &pCont, &contLen) != 0) {
mndTransSetRpcRsp(pTrans, pCont, contLen); mndTransSetRpcRsp(pTrans, pCont, contLen);
} }
} }
...@@ -1599,15 +1599,15 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl ...@@ -1599,15 +1599,15 @@ static int32_t mndRetrieveTrans(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)stage, false); colDataAppend(pColInfo, numOfRows, (const char *)stage, false);
char dbname1[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char dbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname1, mndGetDbStr(pTrans->dbname1), pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndGetDbStr(pTrans->dbname), pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)dbname1, false); colDataAppend(pColInfo, numOfRows, (const char *)dbname, false);
char dbname2[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; char stbname[TSDB_DB_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname2, mndGetDbStr(pTrans->dbname2), pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(stbname, mndGetDbStr(pTrans->stbname), pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)dbname2, false); colDataAppend(pColInfo, numOfRows, (const char *)stbname, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->failedTimes, false);
......
...@@ -31,45 +31,57 @@ class TDTestCase: ...@@ -31,45 +31,57 @@ class TDTestCase:
tdSql.prepare() tdSql.prepare()
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}') tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}')
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(0) tdSql.checkRows(0)
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}') tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}')
for i in range(int(self.tbnum/2)): for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table {self.ntbname}_{i} ttl {self.modify_ttl}') tdSql.execute(f'alter table db.{self.ntbname}_{i} ttl {self.modify_ttl}')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2)) tdSql.checkRows(self.tbnum - int(self.tbnum/2))
tdSql.execute('drop database db') tdSql.execute('drop database db')
def ttl_check_ctb(self): def ttl_check_ctb(self):
tdSql.prepare() tdSql.prepare()
tdSql.execute(f'create table {self.stbname} (ts timestamp,c0 int) tags(t0 int)') tdSql.execute(f'create table db.{self.stbname} (ts timestamp,c0 int) tags(t0 int)')
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({i}) ttl {self.ttl_param}') tdSql.execute(f'create table db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param}')
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(0) tdSql.checkRows(0)
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({i}) ttl {self.default_ttl}') tdSql.execute(f'create table db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.default_ttl}')
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
for i in range(int(self.tbnum/2)): for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table {self.stbname}_{i} ttl {self.modify_ttl}') tdSql.execute(f'alter table db.{self.stbname}_{i} ttl {self.modify_ttl}')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2)) tdSql.checkRows(self.tbnum - int(self.tbnum/2))
tdSql.execute('drop database db') tdSql.execute('drop database db')
def ttl_check_insert(self):
tdSql.prepare()
tdSql.execute(f'create table db.{self.stbname} (ts timestamp,c0 int) tags(t0 int)')
for i in range(self.tbnum):
tdSql.execute(f'insert into db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param} values(now,1)')
tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'])
tdSql.query(f'show db.tables')
tdSql.checkRows(0)
tdSql.execute('drop database db')
def run(self): def run(self):
self.ttl_check_ntb() self.ttl_check_ntb()
self.ttl_check_ctb() self.ttl_check_ctb()
self.ttl_check_insert()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
......
...@@ -32,7 +32,7 @@ python3 ./test.py -f 1-insert/block_wise.py ...@@ -32,7 +32,7 @@ python3 ./test.py -f 1-insert/block_wise.py
python3 ./test.py -f 1-insert/create_retentions.py python3 ./test.py -f 1-insert/create_retentions.py
python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 1-insert/mutil_stage.py python3 ./test.py -f 1-insert/mutil_stage.py
python3 ./test.py -f 1-insert/table_param_ttl.py -R
python3 ./test.py -f 1-insert/update_data_muti_rows.py python3 ./test.py -f 1-insert/update_data_muti_rows.py
python3 ./test.py -f 1-insert/db_tb_name_check.py python3 ./test.py -f 1-insert/db_tb_name_check.py
......
...@@ -294,8 +294,8 @@ void dumpTrans(SSdb *pSdb, SJson *json) { ...@@ -294,8 +294,8 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
tjsonAddIntegerToObject(item, "conflict", pObj->conflict); tjsonAddIntegerToObject(item, "conflict", pObj->conflict);
tjsonAddIntegerToObject(item, "exec", pObj->exec); tjsonAddIntegerToObject(item, "exec", pObj->exec);
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime)); tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
tjsonAddStringToObject(item, "dbname1", pObj->dbname1); tjsonAddStringToObject(item, "dbname", pObj->dbname);
tjsonAddStringToObject(item, "dbname2", pObj->dbname2); tjsonAddStringToObject(item, "stbname", pObj->stbname);
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions)); tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions));
tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions)); tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions));
tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions)); tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册