提交 9fc22fbf 编写于 作者: G Ganlin Zhao

Merge branch '3.0' into fix/TD-17511

...@@ -2210,7 +2210,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) { ...@@ -2210,7 +2210,7 @@ static int32_t smlParseTelnetLine(SSmlHandle *info, void *data) {
(SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen); (SSmlSTableMeta **)taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen);
if (tableMeta) { // update meta if (tableMeta) { // update meta
ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf); ret = smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, cols, &info->msgBuf);
if (!hasTable && ret) { if (!hasTable && ret == TSDB_CODE_SUCCESS) {
ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf); ret = smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, (*oneTable)->tags, &info->msgBuf);
} }
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
......
...@@ -44,6 +44,10 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM ...@@ -44,6 +44,10 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
SSdbRaw *pRaw = pMsg->pCont; SSdbRaw *pRaw = pMsg->pCont;
// delete msg handle
SRpcMsg rpcMsg = {0};
syncGetAndDelRespRpc(pMnode->syncMgmt.sync, cbMeta.seqNum, &rpcMsg.info);
int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw);
pMgmt->errCode = cbMeta.code; pMgmt->errCode = cbMeta.code;
mDebug("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 mDebug("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
......
...@@ -2807,7 +2807,6 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl ...@@ -2807,7 +2807,6 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
if (pCond->suid != 0) { if (pCond->suid != 0) {
(*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1); (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, (*ppReader)->suid, -1);
// ASSERT((*ppReader)->pSchema);
} else if (taosArrayGetSize(pTableList) > 0) { } else if (taosArrayGetSize(pTableList) > 0) {
STableKeyInfo* pKey = taosArrayGet(pTableList, 0); STableKeyInfo* pKey = taosArrayGet(pTableList, 0);
(*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1); (*ppReader)->pSchema = metaGetTbTSchema((*ppReader)->pTsdb->pVnode->pMeta, pKey->uid, -1);
......
...@@ -71,6 +71,8 @@ int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode) { ...@@ -71,6 +71,8 @@ int32_t syncNodeRequestVotePeersSnapshot(SSyncNode* pSyncNode) {
} }
int32_t syncNodeElect(SSyncNode* pSyncNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeEventLog(pSyncNode, "begin election");
int32_t ret = 0; int32_t ret = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollower2Candidate(pSyncNode); syncNodeFollower2Candidate(pSyncNode);
...@@ -120,12 +122,15 @@ int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, con ...@@ -120,12 +122,15 @@ int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, con
int32_t ret = 0; int32_t ret = 0;
do { do {
char host[128]; char host[64];
uint16_t port; uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port); syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-request-vote to %s:%d, {term:%" PRIu64 ", last-index:%" PRId64 ", last-term:%" PRIu64 char logBuf[256];
"}", snprintf(logBuf, sizeof(logBuf),
pSyncNode->vgId, host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); "send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "", host, port,
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(pSyncNode, logBuf);
} while (0); } while (0);
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
......
...@@ -999,7 +999,18 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -999,7 +999,18 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// init TLA+ log vars // init TLA+ log vars
pSyncNode->pLogStore = logStoreCreate(pSyncNode); pSyncNode->pLogStore = logStoreCreate(pSyncNode);
ASSERT(pSyncNode->pLogStore != NULL); ASSERT(pSyncNode->pLogStore != NULL);
pSyncNode->commitIndex = SYNC_INDEX_INVALID;
SyncIndex commitIndex = SYNC_INDEX_INVALID;
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
SSnapshot snapshot = {0};
int32_t code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
ASSERT(code == 0);
if (snapshot.lastApplyIndex > commitIndex) {
commitIndex = snapshot.lastApplyIndex;
syncNodeEventLog(pSyncNode, "reset commit index by snapshot");
}
}
pSyncNode->commitIndex = commitIndex;
// timer ms init // timer ms init
pSyncNode->pingBaseLine = PING_TIMER_MS; pSyncNode->pingBaseLine = PING_TIMER_MS;
...@@ -2061,21 +2072,21 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { ...@@ -2061,21 +2072,21 @@ void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER);
pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE;
syncNodeLog2("==state change syncNodeFollower2Candidate==", pSyncNode); syncNodeEventLog(pSyncNode, "follower to candidate");
} }
void syncNodeLeader2Follower(SSyncNode* pSyncNode) { void syncNodeLeader2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
syncNodeBecomeFollower(pSyncNode, "leader to follower"); syncNodeBecomeFollower(pSyncNode, "leader to follower");
syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode); syncNodeEventLog(pSyncNode, "leader to follower");
} }
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
syncNodeBecomeFollower(pSyncNode, "candidate to follower"); syncNodeBecomeFollower(pSyncNode, "candidate to follower");
syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode); syncNodeEventLog(pSyncNode, "candidate to follower");
} }
// raft vote -------------- // raft vote --------------
......
...@@ -45,8 +45,6 @@ ...@@ -45,8 +45,6 @@
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
int32_t ret = 0; int32_t ret = 0;
syncRequestVoteLog2("==syncNodeOnRequestVoteCb==", pMsg);
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do { do {
...@@ -55,8 +53,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -55,8 +53,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
", maybe replica already dropped", "}, maybe replica already dropped",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(ths, logBuf); syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
...@@ -98,8 +96,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -98,8 +96,8 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
", reply-grant:%d", "}, reply-grant:%d",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted); host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
syncNodeEventLog(ths, logBuf); syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
...@@ -220,8 +218,8 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -220,8 +218,8 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
uint16_t port; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
", maybe replica already dropped", "}, maybe replica already dropped",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm); host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(ths, logBuf); syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
...@@ -262,7 +260,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { ...@@ -262,7 +260,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf), snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
", reply-grant:%d}", "}, reply-grant:%d",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted); host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
syncNodeEventLog(ths, logBuf); syncNodeEventLog(ths, logBuf);
} while (0); } while (0);
......
...@@ -40,22 +40,41 @@ ...@@ -40,22 +40,41 @@
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
// print log // trace log
char logBuf[128] = {0}; do {
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnRequestVoteReplyCb== term:%" PRIu64, ths->pRaftStore->currentTerm); char host[64];
syncRequestVoteReplyLog2(logBuf, pMsg); uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
port, pMsg->term, pMsg->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
sInfo("recv SyncRequestVoteReply, maybe replica already dropped"); char host[64];
return ret; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
} }
// drop stale response // drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("recv SyncRequestVoteReply, drop stale response, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term, char host[64];
ths->pRaftStore->currentTerm); uint16_t port;
return ret; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
} }
// ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm));
...@@ -65,12 +84,14 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) ...@@ -65,12 +84,14 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
// } // }
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0}; char host[64];
snprintf(logBuf, sizeof(logBuf), "syncNodeOnRequestVoteReplyCb error term, receive:%" PRIu64 " current:%" PRIu64, uint16_t port;
pMsg->term, ths->pRaftStore->currentTerm); syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
syncNodePrint2(logBuf, ths); char logBuf[256];
sError("%s", logBuf); snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
return ret; host, port, pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
} }
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
...@@ -99,7 +120,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) ...@@ -99,7 +120,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
} }
} }
return ret; return 0;
} }
#if 0 #if 0
...@@ -164,22 +185,41 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) ...@@ -164,22 +185,41 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0; int32_t ret = 0;
// print log // trace log
char logBuf[128] = {0}; do {
snprintf(logBuf, sizeof(logBuf), "recv SyncRequestVoteReply, term:%" PRIu64, ths->pRaftStore->currentTerm); char host[64];
syncRequestVoteReplyLog2(logBuf, pMsg); uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
port, pMsg->term, pMsg->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
sInfo("recv SyncRequestVoteReply, maybe replica already dropped"); char host[64];
return ret; uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
} }
// drop stale response // drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) { if (pMsg->term < ths->pRaftStore->currentTerm) {
sTrace("recv SyncRequestVoteReply, drop stale response, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term, char host[64];
ths->pRaftStore->currentTerm); uint16_t port;
return ret; syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
return -1;
} }
// ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm));
...@@ -189,13 +229,14 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl ...@@ -189,13 +229,14 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
// } // }
if (pMsg->term > ths->pRaftStore->currentTerm) { if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128] = {0}; char host[64];
snprintf(logBuf, sizeof(logBuf), uint16_t port;
"recv SyncRequestVoteReply, error term, receive_term:%" PRIu64 " current_term:%" PRIu64, pMsg->term, syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
ths->pRaftStore->currentTerm); char logBuf[256];
syncNodePrint2(logBuf, ths); snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
sError("%s", logBuf); host, port, pMsg->term, pMsg->voteGranted);
return ret; syncNodeErrorLog(ths, logBuf);
return -1;
} }
ASSERT(pMsg->term == ths->pRaftStore->currentTerm); ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
...@@ -224,5 +265,5 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl ...@@ -224,5 +265,5 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
} }
} }
return ret; return 0;
} }
\ No newline at end of file
...@@ -573,6 +573,12 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap ...@@ -573,6 +573,12 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex; pReceiver->pSyncNode->commitIndex = pReceiver->snapshot.lastApplyIndex;
} }
// maybe update term
if (pReceiver->snapshot.lastApplyTerm > pReceiver->pSyncNode->pRaftStore->currentTerm) {
pReceiver->pSyncNode->pRaftStore->currentTerm = pReceiver->snapshot.lastApplyTerm;
raftStorePersist(pReceiver->pSyncNode->pRaftStore);
}
// stop writer, apply data // stop writer, apply data
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
&(pReceiver->snapshot)); &(pReceiver->snapshot));
......
...@@ -392,7 +392,7 @@ typedef struct SDelayQueue { ...@@ -392,7 +392,7 @@ typedef struct SDelayQueue {
} SDelayQueue; } SDelayQueue;
int transDQCreate(uv_loop_t* loop, SDelayQueue** queue); int transDQCreate(uv_loop_t* loop, SDelayQueue** queue);
void transDQDestroy(SDelayQueue* queue); void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg));
int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs); int transDQSched(SDelayQueue* queue, void (*func)(void* arg), void* arg, uint64_t timeoutMs);
bool transEpSetIsEqual(SEpSet* a, SEpSet* b); bool transEpSetIsEqual(SEpSet* a, SEpSet* b);
......
...@@ -140,7 +140,7 @@ static void destroyUserdata(STransMsg* userdata); ...@@ -140,7 +140,7 @@ static void destroyUserdata(STransMsg* userdata);
static int cliRBChoseIdx(STrans* pTransInst); static int cliRBChoseIdx(STrans* pTransInst);
static void destroyCmsg(SCliMsg* cmsg); static void destroyCmsg(void* cmsg);
static void transDestroyConnCtx(STransConnCtx* ctx); static void transDestroyConnCtx(STransConnCtx* ctx);
// thread obj // thread obj
static SCliThrd* createThrdObj(); static SCliThrd* createThrdObj();
...@@ -198,6 +198,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) { ...@@ -198,6 +198,7 @@ static void cliReleaseUnfinishedMsg(SCliConn* conn) {
} \ } \
destroyCmsg(pMsg); \ destroyCmsg(pMsg); \
cliReleaseUnfinishedMsg(conn); \ cliReleaseUnfinishedMsg(conn); \
transQueueClear(&conn->cliMsgs); \
addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \ addConnToPool(((SCliThrd*)conn->hostThrd)->pool, conn); \
return; \ return; \
} \ } \
...@@ -545,6 +546,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { ...@@ -545,6 +546,7 @@ static void addConnToPool(void* pool, SCliConn* conn) {
STrans* pTransInst = thrd->pTransInst; STrans* pTransInst = thrd->pTransInst;
conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime); conn->expireTime = taosGetTimestampMs() + CONN_PERSIST_TIME(pTransInst->idleTime);
cliReleaseUnfinishedMsg(conn);
transQueueClear(&conn->cliMsgs); transQueueClear(&conn->cliMsgs);
transCtxCleanup(&conn->ctx); transCtxCleanup(&conn->ctx);
conn->status = ConnInPool; conn->status = ConnInPool;
...@@ -645,6 +647,7 @@ static void cliDestroy(uv_handle_t* handle) { ...@@ -645,6 +647,7 @@ static void cliDestroy(uv_handle_t* handle) {
conn->stream->data = NULL; conn->stream->data = NULL;
taosMemoryFree(conn->stream); taosMemoryFree(conn->stream);
transCtxCleanup(&conn->ctx); transCtxCleanup(&conn->ctx);
cliReleaseUnfinishedMsg(conn);
transQueueDestroy(&conn->cliMsgs); transQueueDestroy(&conn->cliMsgs);
tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); tTrace("%s conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
transReqQueueClear(&conn->wreqQueue); transReqQueueClear(&conn->wreqQueue);
...@@ -962,7 +965,8 @@ static void destroyUserdata(STransMsg* userdata) { ...@@ -962,7 +965,8 @@ static void destroyUserdata(STransMsg* userdata) {
transFreeMsg(userdata->pCont); transFreeMsg(userdata->pCont);
userdata->pCont = NULL; userdata->pCont = NULL;
} }
static void destroyCmsg(SCliMsg* pMsg) { static void destroyCmsg(void* arg) {
SCliMsg* pMsg = arg;
if (pMsg == NULL) { if (pMsg == NULL) {
return; return;
} }
...@@ -1001,7 +1005,7 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1001,7 +1005,7 @@ static void destroyThrdObj(SCliThrd* pThrd) {
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg); TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SCliMsg, destroyCmsg);
transDestroyAsyncPool(pThrd->asyncPool); transDestroyAsyncPool(pThrd->asyncPool);
transDQDestroy(pThrd->delayQueue); transDQDestroy(pThrd->delayQueue, destroyCmsg);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
} }
......
...@@ -456,7 +456,7 @@ int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) { ...@@ -456,7 +456,7 @@ int transDQCreate(uv_loop_t* loop, SDelayQueue** queue) {
return 0; return 0;
} }
void transDQDestroy(SDelayQueue* queue) { void transDQDestroy(SDelayQueue* queue, void (*freeFunc)(void* arg)) {
taosMemoryFree(queue->timer); taosMemoryFree(queue->timer);
while (heapSize(queue->heap) > 0) { while (heapSize(queue->heap) > 0) {
...@@ -467,6 +467,11 @@ void transDQDestroy(SDelayQueue* queue) { ...@@ -467,6 +467,11 @@ void transDQDestroy(SDelayQueue* queue) {
heapRemove(queue->heap, minNode); heapRemove(queue->heap, minNode);
SDelayTask* task = container_of(minNode, SDelayTask, node); SDelayTask* task = container_of(minNode, SDelayTask, node);
STaskArg* arg = task->arg;
freeFunc(arg->param1);
taosMemoryFree(arg);
taosMemoryFree(task); taosMemoryFree(task);
} }
heapDestroy(queue->heap); heapDestroy(queue->heap);
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
./test.sh -f tsim/db/basic4.sim ./test.sh -f tsim/db/basic4.sim
./test.sh -f tsim/db/basic5.sim ./test.sh -f tsim/db/basic5.sim
./test.sh -f tsim/db/basic6.sim ./test.sh -f tsim/db/basic6.sim
# nojira ./test.sh -f tsim/db/commit.sim ./test.sh -f tsim/db/commit.sim
./test.sh -f tsim/db/create_all_options.sim ./test.sh -f tsim/db/create_all_options.sim
./test.sh -f tsim/db/delete_reuse1.sim ./test.sh -f tsim/db/delete_reuse1.sim
./test.sh -f tsim/db/delete_reuse2.sim ./test.sh -f tsim/db/delete_reuse2.sim
...@@ -27,7 +27,7 @@ ...@@ -27,7 +27,7 @@
./test.sh -f tsim/db/delete_writing2.sim ./test.sh -f tsim/db/delete_writing2.sim
# unsupport ./test.sh -f tsim/db/dropdnodes.sim # unsupport ./test.sh -f tsim/db/dropdnodes.sim
./test.sh -f tsim/db/error1.sim ./test.sh -f tsim/db/error1.sim
# nojira ./test.sh -f tsim/db/keep.sim # jira ./test.sh -f tsim/db/keep.sim
./test.sh -f tsim/db/len.sim ./test.sh -f tsim/db/len.sim
./test.sh -f tsim/db/repeat.sim ./test.sh -f tsim/db/repeat.sim
./test.sh -f tsim/db/show_create_db.sim ./test.sh -f tsim/db/show_create_db.sim
...@@ -88,7 +88,7 @@ ...@@ -88,7 +88,7 @@
./test.sh -f tsim/parser/alter__for_community_version.sim ./test.sh -f tsim/parser/alter__for_community_version.sim
./test.sh -f tsim/parser/alter_column.sim ./test.sh -f tsim/parser/alter_column.sim
./test.sh -f tsim/parser/alter_stable.sim ./test.sh -f tsim/parser/alter_stable.sim
# nojira ./test.sh -f tsim/parser/auto_create_tb.sim # jira ./test.sh -f tsim/parser/auto_create_tb.sim
./test.sh -f tsim/parser/auto_create_tb_drop_tb.sim ./test.sh -f tsim/parser/auto_create_tb_drop_tb.sim
./test.sh -f tsim/parser/between_and.sim ./test.sh -f tsim/parser/between_and.sim
./test.sh -f tsim/parser/binary_escapeCharacter.sim ./test.sh -f tsim/parser/binary_escapeCharacter.sim
......
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/cfg.sh -n dnode1 -c walLevel -v 2
system sh/cfg.sh -n dnode2 -c walLevel -v 2
system sh/cfg.sh -n dnode3 -c walLevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
print ========= start dnode1 as master print ========= start dnode1 as master
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect sql connect
sleep 2000
print ========= start other dnodes print ========= start other dnodes
sql create dnode $hostname2 sql create dnode $hostname port 7200
system sh/exec.sh -n dnode2 -s start
sleep 2000 $x = 0
step1:
$ = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ======== step1 create db print ======== step1 create db
sql create database commitdb replica 1 duration 7 keep 30 sql create database commitdb replica 1 duration 7 keep 30
...@@ -68,9 +76,7 @@ $num = $rows + 2 ...@@ -68,9 +76,7 @@ $num = $rows + 2
print ======== step3 import old data print ======== step3 import old data
sql import into tb values (now - 10d , -10 ) sql import into tb values (now - 10d , -10 )
sql import into tb values (now - 11d , -11 ) sql import into tb values (now - 11d , -11 )
sql select * from tb order by ts desc sql select * from tb order by ts desc
print ===> rows $rows expect $num print ===> rows $rows expect $num
print ===> last $data01 expect $data01 print ===> last $data01 expect $data01
...@@ -99,9 +105,7 @@ endi ...@@ -99,9 +105,7 @@ endi
print ======== step5 stop dnode print ======== step5 stop dnode
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
sleep 3000
system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode2 -s start
sleep 3000
sql select * from tb sql select * from tb
print ===> rows $rows print ===> rows $rows
...@@ -116,10 +120,4 @@ if $data01 != 40 then ...@@ -116,10 +120,4 @@ if $data01 != 40 then
endi endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT \ No newline at end of file
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
system sh/stop_dnodes.sh system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
system sh/exec.sh -n dnode2 -s start
sql connect sql connect
print =============== step1 create dnode2
sql create dnode $hostname port 7200
$x = 0
step1:
$x = $x + 1
sleep 1000
if $x == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $data00 $data01 $data02 $data03 $data04 $data05
print ===> $data10 $data11 $data12 $data13 $data14 $data15
if $rows != 2 then
return -1
endi
if $data(1)[4] != ready then
goto step1
endi
if $data(2)[4] != ready then
goto step1
endi
print ======== step1 create db print ======== step1 create db
sql create database keepdb replica 1 keep 30 duration 7 sql create database keepdb replica 1 keep 30 duration 7 vgroups 2
sql use keepdb sql use keepdb
sql create table tb (ts timestamp, i int) sql create table tb (ts timestamp, i int)
$x = 1 $x = 1
while $x < 41 while $x < 41
$time = $x . d $time = $x . d
sql insert into tb values (now + $time , $x ) -x step2 sql insert into tb values (now - $time , $x ) -x step2
step2: step2:
$x = $x + 1 $x = $x + 1
endw endw
sql select * from tb sql select * from tb
print ===> rows $rows print ===> rows $rows last $data01
print ===> last $data01
if $rows >= 40 then if $rows >= 40 then
return -1 return -1
endi endi
...@@ -61,9 +27,7 @@ system sh/exec.sh -n dnode2 -s stop -x SIGINT ...@@ -61,9 +27,7 @@ system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode2 -s start
sql select * from tb sql select * from tb
print ===> rows $rows print ===> rows $rows last $data01
print ===> last $data01
if $rows >= 40 then if $rows >= 40 then
return -1 return -1
endi endi
...@@ -75,23 +39,13 @@ $num1 = $rows + 40 ...@@ -75,23 +39,13 @@ $num1 = $rows + 40
print ======== step3 alter db print ======== step3 alter db
sql alter database keepdb keep 60 sql alter database keepdb keep 60
flush database keepdb sql flush database keepdb
sql show databases sql show databases
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07
if $data02 != 1 then if $data22 != 2 then
return -1 return -1
endi endi
if $data03 != 1 then if $data27 != 86400m,86400m,86400m then
return -1
endi
if $data04 != 1 then
return -1
endi
if $data05 != 7 then
return -1
endi
if $data06 != 60 then
return -1 return -1
endi endi
...@@ -99,98 +53,73 @@ print ======== step4 insert data ...@@ -99,98 +53,73 @@ print ======== step4 insert data
$x = 41 $x = 41
while $x < 81 while $x < 81
$time = $x . d $time = $x . d
sql insert into tb values (now + $time , $x ) sql insert into tb values (now - $time , $x ) -x step4
step4:
$x = $x + 1 $x = $x + 1
endw endw
sql select * from tb sql select * from tb
print ===> rows $rows print ===> rows $rows last $data01
print ===> last $data01 if $rows >= 80 then
if $rows != $num1 then
return -1 return -1
endi endi
if $data01 != 80 then if $rows <= 50 then
return -1 return -1
endi endi
return
print ======== step5 stop dnode print ======== step5 stop dnode
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGKILL
system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode2 -s start
sql select * from tb sql select * from tb
print ===> rows $rows print ===> rows $rows last $data01
print ===> last $data01 if $rows >= 80 then
if $rows >= $num1 then
return -1 return -1
endi endi
if $rows <= 50 then if $rows <= 50 then
return -1 return -1
endi endi
if $data01 != 80 then
return -1
endi
print ======== step6 alter db print ======== step6 alter db
sql alter database keepdb keep 30 sql alter database keepdb keep 30
sleep 1000
sql show databases sql show databases
print $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 if $data22 != 2 then
if $data02 != 1 then
return -1
endi
if $data03 != 1 then
return -1
endi
if $data04 != 1 then
return -1
endi
if $data05 != 7 then
return -1 return -1
endi endi
if $data06 != 30 then if $data27 != 43200m,43200m,43200m then
return -1 return -1
endi endi
print ======== step7 stop dnode print ======== step7 stop dnode
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGKILL
sleep 2000
system sh/exec.sh -n dnode2 -s start system sh/exec.sh -n dnode2 -s start
sleep 2000
sql select * from tb sql select * from tb
print ===> rows $rows print ===> rows $rows last $data01
print ===> last $data01
if $rows >= 40 then if $rows >= 40 then
return -1 return -1
endi endi
if $rows <= 20 then if $rows <= 20 then
return -1 return -1
endi endi
if $data01 != 80 then
return -1
endi
$num3 = $rows + 40
print ======== step8 insert data print ======== step8 insert data
$x = 81 $x = 81
while $x < 121 while $x < 121
$time = $x . d $time = $x . d
sql insert into tb values (now + $time , $x ) sql insert into tb values (now - $time , $x ) -x step4
step4:
$x = $x + 1 $x = $x + 1
endw endw
sql select * from tb sql select * from tb
print ===> rows $rows print ===> rows $rows last $data01
print ===> last $data01 if $rows >= 40 then
if $rows != $num3 then
return -1 return -1
endi endi
if $data01 != 120 then if $rows <= 20 then
return -1 return -1
endi endi
...@@ -208,4 +137,6 @@ sql alter database keepdb duration 1 -x error3 ...@@ -208,4 +137,6 @@ sql alter database keepdb duration 1 -x error3
error3: error3:
print ======= test success print ======= test success
\ No newline at end of file system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
\ No newline at end of file
...@@ -90,7 +90,7 @@ $null= ...@@ -90,7 +90,7 @@ $null=
system_content sh/checkValgrind.sh -n dnode1 system_content sh/checkValgrind.sh -n dnode1
print cmd return result ----> [ $system_content ] print cmd return result ----> [ $system_content ]
if $system_content > 0 then if $system_content > 2 then
return -1 return -1
endi endi
......
...@@ -211,10 +211,10 @@ class TDTestCase: ...@@ -211,10 +211,10 @@ class TDTestCase:
for error in [constant.INT_UN_MIN-1,constant.INT_UN_MAX+1]: for error in [constant.INT_UN_MIN-1,constant.INT_UN_MAX+1]:
tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}') tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}')
#! bug TD-17106 #! bug TD-17106
# elif v.lower() == 'bigint unsigned': elif v.lower() == 'bigint unsigned':
# self.tag_check(i,k,tag_unbigint) self.tag_check(i,k,tag_unbigint)
# for error in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1]: for error in [constant.BIGINT_UN_MIN-1,constant.BIGINT_UN_MAX+1]:
# tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}') tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}')
elif v.lower() == 'bool': elif v.lower() == 'bool':
self.tag_check(i,k,tag_bool) self.tag_check(i,k,tag_bool)
elif v.lower() == 'float': elif v.lower() == 'float':
...@@ -225,8 +225,8 @@ class TDTestCase: ...@@ -225,8 +225,8 @@ class TDTestCase:
else: else:
tdLog.exit(f'select {k} from {self.stbname}_{i},data check failure') tdLog.exit(f'select {k} from {self.stbname}_{i},data check failure')
#! bug TD-17106 #! bug TD-17106
# for error in [constant.FLOAT_MIN*1.1,constant.FLOAT_MAX*1.1]: for error in [constant.FLOAT_MIN*1.1,constant.FLOAT_MAX*1.1]:
# tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}') tdSql.error(f'alter table {self.stbname}_{i} set tag {k} = {error}')
elif v.lower() == 'double': elif v.lower() == 'double':
tdSql.execute(f'alter table {self.stbname}_{i} set tag {k} = {tag_double}') tdSql.execute(f'alter table {self.stbname}_{i} set tag {k} = {tag_double}')
tdSql.query(f'select {k} from {self.stbname}_{i}') tdSql.query(f'select {k} from {self.stbname}_{i}')
......
...@@ -25,12 +25,13 @@ from util.sqlset import TDSetSql ...@@ -25,12 +25,13 @@ from util.sqlset import TDSetSql
class TDTestCase: class TDTestCase:
def init(self, conn, logSql): def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__) tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(),logSql) tdSql.init(conn.cursor())
self.dbname = 'db_test' self.dbname = 'db_test'
self.setsql = TDSetSql() self.setsql = TDSetSql()
self.stbname = 'stb'
self.ntbname = 'ntb' self.ntbname = 'ntb'
self.rowNum = 10 self.rowNum = 5
self.tbnum = 20 self.tbnum = 2
self.ts = 1537146000000 self.ts = 1537146000000
self.binary_str = 'taosdata' self.binary_str = 'taosdata'
self.nchar_str = '涛思数据' self.nchar_str = '涛思数据'
...@@ -51,6 +52,7 @@ class TDTestCase: ...@@ -51,6 +52,7 @@ class TDTestCase:
'col13': f'nchar({self.str_length})', 'col13': f'nchar({self.str_length})',
} }
self.tinyint_val = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX) self.tinyint_val = random.randint(constant.TINYINT_MIN,constant.TINYINT_MAX)
self.smallint_val = random.randint(constant.SMALLINT_MIN,constant.SMALLINT_MAX) self.smallint_val = random.randint(constant.SMALLINT_MIN,constant.SMALLINT_MAX)
self.int_val = random.randint(constant.INT_MIN,constant.INT_MAX) self.int_val = random.randint(constant.INT_MIN,constant.INT_MAX)
...@@ -107,32 +109,50 @@ class TDTestCase: ...@@ -107,32 +109,50 @@ class TDTestCase:
tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{base_data['binary']}")''') tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{base_data['binary']}")''')
elif 'nchar' in col_type.lower(): elif 'nchar' in col_type.lower():
tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{base_data['nchar']}")''') tdSql.execute(f'''insert into {tbname} values({self.ts+i},"{base_data['nchar']}")''')
def delete_all_data(self,tbname,col_type,row_num,base_data,dbname,tb_type,tb_num=1):
def delete_all_data(self,tbname,col_type,row_num,base_data,dbname):
tdSql.execute(f'delete from {tbname}') tdSql.execute(f'delete from {tbname}')
tdSql.execute(f'flush database {dbname}') tdSql.execute(f'flush database {dbname}')
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.query(f'select * from {tbname}') tdSql.query(f'select * from {tbname}')
tdSql.checkRows(0) tdSql.checkRows(0)
self.insert_base_data(col_type,tbname,row_num,base_data) if tb_type == 'ntb' or tb_type == 'ctb':
self.insert_base_data(col_type,tbname,row_num,base_data)
elif tb_type == 'stb':
for i in range(tb_num):
self.insert_base_data(col_type,f'{tbname}_{i}',row_num,base_data)
tdSql.execute(f'flush database {dbname}') tdSql.execute(f'flush database {dbname}')
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.query(f'select * from {tbname}') tdSql.query(f'select * from {tbname}')
tdSql.checkRows(row_num) if tb_type == 'ntb' or tb_type == 'ctb':
def delete_one_row(self,tbname,column_type,column_name,base_data,dbname): tdSql.checkRows(row_num)
elif tb_type =='stb':
tdSql.checkRows(row_num*tb_num)
def delete_one_row(self,tbname,column_type,column_name,base_data,row_num,dbname,tb_type,tb_num=1):
tdSql.execute(f'delete from {tbname} where ts={self.ts}') tdSql.execute(f'delete from {tbname} where ts={self.ts}')
tdSql.execute(f'flush database {dbname}') tdSql.execute(f'flush database {dbname}')
tdSql.execute('reset query cache') tdSql.execute('reset query cache')
tdSql.query(f'select {column_name} from {tbname}') tdSql.query(f'select {column_name} from {tbname}')
tdSql.checkRows(self.rowNum-1) if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(row_num-1)
elif tb_type == 'stb':
tdSql.checkRows((row_num-1)*tb_num)
tdSql.query(f'select {column_name} from {tbname} where ts={self.ts}') tdSql.query(f'select {column_name} from {tbname} where ts={self.ts}')
tdSql.checkRows(0) tdSql.checkRows(0)
if 'binary' in column_type.lower(): if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.execute(f'''insert into {tbname} values({self.ts},"{base_data['binary']}")''') if 'binary' in column_type.lower():
elif 'nchar' in column_type.lower(): tdSql.execute(f'''insert into {tbname} values({self.ts},"{base_data['binary']}")''')
tdSql.execute(f'''insert into {tbname} values({self.ts},"{base_data['nchar']}")''') elif 'nchar' in column_type.lower():
else: tdSql.execute(f'''insert into {tbname} values({self.ts},"{base_data['nchar']}")''')
tdSql.execute(f'insert into {tbname} values({self.ts},{base_data[column_type]})') else:
tdSql.execute(f'insert into {tbname} values({self.ts},{base_data[column_type]})')
elif tb_type == 'stb':
for i in range(tb_num):
if 'binary' in column_type.lower():
tdSql.execute(f'''insert into {tbname}_{i} values({self.ts},"{base_data['binary']}")''')
elif 'nchar' in column_type.lower():
tdSql.execute(f'''insert into {tbname}_{i} values({self.ts},"{base_data['nchar']}")''')
else:
tdSql.execute(f'insert into {tbname}_{i} values({self.ts},{base_data[column_type]})')
tdSql.query(f'select {column_name} from {tbname} where ts={self.ts}') tdSql.query(f'select {column_name} from {tbname} where ts={self.ts}')
if column_type.lower() == 'float' or column_type.lower() == 'double': if column_type.lower() == 'float' or column_type.lower() == 'double':
if abs(tdSql.queryResult[0][0] - base_data[column_type]) / base_data[column_type] <= 0.0001: if abs(tdSql.queryResult[0][0] - base_data[column_type]) / base_data[column_type] <= 0.0001:
...@@ -144,12 +164,56 @@ class TDTestCase: ...@@ -144,12 +164,56 @@ class TDTestCase:
elif 'nchar' in column_type.lower(): elif 'nchar' in column_type.lower():
tdSql.checkEqual(tdSql.queryResult[0][0],base_data['nchar']) tdSql.checkEqual(tdSql.queryResult[0][0],base_data['nchar'])
else: else:
tdSql.checkEqual(tdSql.queryResult[0][0],base_data[column_type]) tdSql.checkEqual(tdSql.queryResult[0][0],base_data[column_type])
def delete_rows(self,dbname,tbname,col_name,col_type,base_data,row_num,tb_type,tb_num=1):
def delete_rows(self): for i in range(row_num):
tdSql.execute(f'delete from {tbname} where ts>{self.ts+i}')
tdSql.execute(f'flush database {dbname}')
pass tdSql.execute('reset query cache')
tdSql.query(f'select {col_name} from {tbname}')
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(i+1)
self.insert_base_data(col_type,tbname,row_num,base_data)
elif tb_type == 'stb':
tdSql.checkRows((i+1)*tb_num)
for j in range(tb_num):
self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data)
for i in range(row_num):
tdSql.execute(f'delete from {tbname} where ts>={self.ts+i}')
tdSql.execute(f'flush database {dbname}')
tdSql.execute('reset query cache')
tdSql.query(f'select {col_name} from {tbname}')
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(i)
self.insert_base_data(col_type,tbname,row_num,base_data)
elif tb_type == 'stb':
tdSql.checkRows(i*tb_num)
for j in range(tb_num):
self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data)
for i in range(row_num):
tdSql.execute(f'delete from {tbname} where ts<={self.ts+i}')
tdSql.execute(f'flush database {dbname}')
tdSql.execute('reset query cache')
tdSql.query(f'select {col_name} from {tbname}')
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(row_num-i-1)
self.insert_base_data(col_type,tbname,row_num,base_data)
elif tb_type == 'stb':
tdSql.checkRows((row_num-i-1)*tb_num)
for j in range(tb_num):
self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data)
for i in range(row_num):
tdSql.execute(f'delete from {tbname} where ts<{self.ts+i}')
tdSql.execute(f'flush database {dbname}')
tdSql.execute('reset query cache')
tdSql.query(f'select {col_name} from {tbname}')
if tb_type == 'ntb' or tb_type == 'ctb':
tdSql.checkRows(row_num-i)
self.insert_base_data(col_type,tbname,row_num,base_data)
elif tb_type == 'stb':
tdSql.checkRows((row_num-i)*tb_num)
for j in range(tb_num):
self.insert_base_data(col_type,f'{tbname}_{j}',row_num,base_data)
def delete_error(self,tbname,column_name,column_type,base_data): def delete_error(self,tbname,column_name,column_type,base_data):
for error_list in ['',f'ts = {self.ts} and',f'ts = {self.ts} or']: for error_list in ['',f'ts = {self.ts} and',f'ts = {self.ts} or']:
if 'binary' in column_type.lower(): if 'binary' in column_type.lower():
...@@ -157,31 +221,56 @@ class TDTestCase: ...@@ -157,31 +221,56 @@ class TDTestCase:
elif 'nchar' in column_type.lower(): elif 'nchar' in column_type.lower():
tdSql.error(f'''delete from {tbname} where {error_list} {column_name} ="{base_data['nchar']}"''') tdSql.error(f'''delete from {tbname} where {error_list} {column_name} ="{base_data['nchar']}"''')
else: else:
tdSql.error('delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}') tdSql.error(f'delete from {tbname} where {error_list} {column_name} = {base_data[column_type]}')
def delete_data_ntb(self): def delete_data_ntb(self):
tdSql.execute(f'create database if not exists {self.dbname}') tdSql.execute(f'create database if not exists {self.dbname}')
tdSql.execute(f'use {self.dbname}') tdSql.execute(f'use {self.dbname}')
for col_name,col_type in self.column_dict.items(): for col_name,col_type in self.column_dict.items():
tdSql.execute(f'create table {self.ntbname} (ts timestamp,{col_name} {col_type})') tdSql.execute(f'create table {self.ntbname} (ts timestamp,{col_name} {col_type})')
self.insert_base_data(col_type,self.ntbname,self.rowNum,self.base_data) self.insert_base_data(col_type,self.ntbname,self.rowNum,self.base_data)
self.delete_one_row(self.ntbname,col_type,col_name,self.base_data,self.dbname) self.delete_one_row(self.ntbname,col_type,col_name,self.base_data,self.rowNum,self.dbname,'ntb')
self.delete_all_data(self.ntbname,col_type,self.rowNum,self.base_data,self.dbname) self.delete_all_data(self.ntbname,col_type,self.rowNum,self.base_data,self.dbname,'ntb')
self.delete_error(self.ntbname,col_name,col_type,self.base_data) self.delete_error(self.ntbname,col_name,col_type,self.base_data)
for i in range(self.rowNum): self.delete_rows(self.dbname,self.ntbname,col_name,col_type,self.base_data,self.rowNum,'ntb')
tdSql.execute(f'delete from {self.ntbname} where ts>{self.ts+i}') for func in ['first','last']:
tdSql.execute(f'flush database {self.dbname}') tdSql.query(f'select {func}(*) from {self.ntbname}')
tdSql.execute('reset query cache')
tdSql.query(f'select {col_name} from {self.ntbname}')
tdSql.checkRows(i+1)
self.insert_base_data(col_type,self.ntbname,self.rowNum,self.base_data)
tdSql.execute(f'drop table {self.ntbname}') tdSql.execute(f'drop table {self.ntbname}')
tdSql.execute(f'drop database {self.dbname}')
def delete_data_ctb(self):
tdSql.execute(f'create database if not exists {self.dbname}')
tdSql.execute(f'use {self.dbname}')
for col_name,col_type in self.column_dict.items():
tdSql.execute(f'create table {self.stbname} (ts timestamp,{col_name} {col_type}) tags(t1 int)')
for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags(1)')
self.insert_base_data(col_type,f'{self.stbname}_{i}',self.rowNum,self.base_data)
self.delete_one_row(f'{self.stbname}_{i}',col_type,col_name,self.base_data,self.rowNum,self.dbname,'ctb')
self.delete_all_data(f'{self.stbname}_{i}',col_type,self.rowNum,self.base_data,self.dbname,'ctb')
self.delete_error(f'{self.stbname}_{i}',col_name,col_type,self.base_data)
self.delete_rows(self.dbname,f'{self.stbname}_{i}',col_name,col_type,self.base_data,self.rowNum,'ctb')
for func in ['first','last']:
tdSql.query(f'select {func}(*) from {self.stbname}_{i}')
tdSql.execute(f'drop table {self.stbname}')
def delete_data_stb(self):
tdSql.execute(f'create database if not exists {self.dbname}')
tdSql.execute(f'use {self.dbname}')
for col_name,col_type in self.column_dict.items():
tdSql.execute(f'create table {self.stbname} (ts timestamp,{col_name} {col_type}) tags(t1 int)')
for i in range(self.tbnum):
tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags(1)')
self.insert_base_data(col_type,f'{self.stbname}_{i}',self.rowNum,self.base_data)
self.delete_error(self.stbname,col_name,col_type,self.base_data)
self.delete_one_row(self.stbname,col_type,col_name,self.base_data,self.rowNum,self.dbname,'stb',self.tbnum)
self.delete_all_data(self.stbname,col_type,self.rowNum,self.base_data,self.dbname,'stb',self.tbnum)
self.delete_rows(self.dbname,self.stbname,col_name,col_type,self.base_data,self.rowNum,'stb',self.tbnum)
for func in ['first','last']:
tdSql.query(f'select {func}(*) from {self.stbname}')
tdSql.execute(f'drop table {self.stbname}')
tdSql.execute(f'drop database {self.dbname}')
def run(self): def run(self):
self.delete_data_ntb() self.delete_data_ntb()
self.delete_data_ctb()
self.delete_data_stb()
def stop(self): def stop(self):
tdSql.close() tdSql.close()
tdLog.success("%s successfully executed" % __file__) tdLog.success("%s successfully executed" % __file__)
......
...@@ -16,6 +16,7 @@ from tmqCommon import * ...@@ -16,6 +16,7 @@ from tmqCommon import *
class TDTestCase: class TDTestCase:
def __init__(self): def __init__(self):
self.snapshot = 0
self.vgroups = 4 self.vgroups = 4
self.ctbNum = 1000 self.ctbNum = 1000
self.rowsPerTbl = 1000 self.rowsPerTbl = 1000
...@@ -44,7 +45,7 @@ class TDTestCase: ...@@ -44,7 +45,7 @@ class TDTestCase:
'pollDelay': 3, 'pollDelay': 3,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 1} 'snapshot': 0}
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
...@@ -84,13 +85,14 @@ class TDTestCase: ...@@ -84,13 +85,14 @@ class TDTestCase:
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 1000, 'ctbNum': 1000,
'rowsPerTbl': 1000, 'rowsPerTbl': 1000,
'batchNum': 400, 'batchNum': 1000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5, 'pollDelay': 5,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 1} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
...@@ -131,10 +133,10 @@ class TDTestCase: ...@@ -131,10 +133,10 @@ class TDTestCase:
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsFromQuery = tdSql.getRows()
if totalConsumeRows != totalRowsInserted: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery))
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -163,6 +165,7 @@ class TDTestCase: ...@@ -163,6 +165,7 @@ class TDTestCase:
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
...@@ -180,12 +183,13 @@ class TDTestCase: ...@@ -180,12 +183,13 @@ class TDTestCase:
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) # queryString = "select ts, c1, c2 from %s.%s "%(paraDict['dbName'], paraDict['stbName'])
queryString = "select ts, c1, c2 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicFromStb1, queryString) sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
consumerId = 0 consumerId = 1
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 0 ifcheckdata = 0
...@@ -210,10 +214,10 @@ class TDTestCase: ...@@ -210,10 +214,10 @@ class TDTestCase:
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsFromQuery = tdSql.getRows()
if totalConsumeRows != totalRowsInserted: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery))
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -222,10 +226,18 @@ class TDTestCase: ...@@ -222,10 +226,18 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare()
self.prepareTestEnv() self.prepareTestEnv()
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.tmqCase1()
self.tmqCase2()
self.prepareTestEnv()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.snapshot = 1
self.tmqCase1() self.tmqCase1()
# self.tmqCase2() # TD-17267 self.tmqCase2()
def stop(self): def stop(self):
......
...@@ -16,6 +16,7 @@ from tmqCommon import * ...@@ -16,6 +16,7 @@ from tmqCommon import *
class TDTestCase: class TDTestCase:
def __init__(self): def __init__(self):
self.snapshot = 0
self.vgroups = 2 self.vgroups = 2
self.ctbNum = 100 self.ctbNum = 100
self.rowsPerTbl = 10000 self.rowsPerTbl = 10000
...@@ -37,15 +38,16 @@ class TDTestCase: ...@@ -37,15 +38,16 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb', 'ctbPrefix': 'ctb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 500, 'ctbNum': 100,
'rowsPerTbl': 1000, 'rowsPerTbl': 10000,
'batchNum': 500, 'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3, 'pollDelay': 3,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 0} 'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['rowsPerTbl'] = self.rowsPerTbl
...@@ -81,30 +83,31 @@ class TDTestCase: ...@@ -81,30 +83,31 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb', 'ctbPrefix': 'ctb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 1000, 'ctbNum': 100,
'rowsPerTbl': 1000, 'rowsPerTbl': 10000,
'batchNum': 400, 'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5, 'pollDelay': 5,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 1} 'snapshot': 0}
# paraDict['vgroups'] = self.vgroups paraDict['snapshot'] = self.snapshot
# paraDict['ctbNum'] = self.ctbNum paraDict['vgroups'] = self.vgroups
# paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() # tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") # tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) # tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb") # tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], # tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) # ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data") # tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
...@@ -132,7 +135,7 @@ class TDTestCase: ...@@ -132,7 +135,7 @@ class TDTestCase:
tdLog.info("================= restart dnode ===========================") tdLog.info("================= restart dnode ===========================")
tdDnodes.stop(1) tdDnodes.stop(1)
tdDnodes.start(1) tdDnodes.start(1)
time.sleep(5) time.sleep(3)
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 1 expectRows = 1
...@@ -142,10 +145,10 @@ class TDTestCase: ...@@ -142,10 +145,10 @@ class TDTestCase:
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsFromQury = tdSql.getRows()
if totalConsumeRows != totalRowsInserted: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQury))
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) if totalConsumeRows != totalRowsFromQury:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -165,30 +168,31 @@ class TDTestCase: ...@@ -165,30 +168,31 @@ class TDTestCase:
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb', 'ctbPrefix': 'ctb',
'ctbStartIdx': 0, 'ctbStartIdx': 0,
'ctbNum': 1000, 'ctbNum': 100,
'rowsPerTbl': 1000, 'rowsPerTbl': 10000,
'batchNum': 1000, 'batchNum': 3000,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5, 'pollDelay': 5,
'showMsg': 1, 'showMsg': 1,
'showRow': 1, 'showRow': 1,
'snapshot': 1} 'snapshot': 0}
# paraDict['vgroups'] = self.vgroups paraDict['snapshot'] = self.snapshot
# paraDict['ctbNum'] = self.ctbNum paraDict['vgroups'] = self.vgroups
# paraDict['rowsPerTbl'] = self.rowsPerTbl paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable() tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1) # tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb") # tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) # tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb") # tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], # tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) # ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data") # tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], # tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], # ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) # startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("create topics from stb1") tdLog.info("create topics from stb1")
topicFromStb1 = 'topic_stb1' topicFromStb1 = 'topic_stb1'
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName']) queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
...@@ -196,29 +200,29 @@ class TDTestCase: ...@@ -196,29 +200,29 @@ class TDTestCase:
tdLog.info("create topic sql: %s"%sqlString) tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString) tdSql.execute(sqlString)
consumerId = 0 consumerId = 1
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + 100000
topicList = topicFromStb1 topicList = topicFromStb1
ifcheckdata = 0 ifcheckdata = 0
ifManualCommit = 0 ifManualCommit = 0
keyList = 'group.id:cgrp1,\ keyList = 'group.id:cgrp1,\
enable.auto.commit:true,\ enable.auto.commit:true,\
auto.commit.interval.ms:1000,\ auto.commit.interval.ms:3000,\
auto.offset.reset:earliest' auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor") tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("create some new child table and insert data ")
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tmqCom.getStartCommitNotifyFromTmqsim() tmqCom.getStartCommitNotifyFromTmqsim()
tdLog.info("================= restart dnode ===========================") tdLog.info("================= restart dnode ===========================")
tdDnodes.stop(1) tdDnodes.stop(1)
tdDnodes.start(1) tdDnodes.start(1)
time.sleep(5) time.sleep(3)
tdLog.info("create some new child table and insert data ")
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
tdLog.info("insert process end, and start to check consume result") tdLog.info("insert process end, and start to check consume result")
expectRows = 1 expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows) resultList = tmqCom.selectConsumeResult(expectRows)
...@@ -227,10 +231,10 @@ class TDTestCase: ...@@ -227,10 +231,10 @@ class TDTestCase:
totalConsumeRows += resultList[i] totalConsumeRows += resultList[i]
tdSql.query(queryString) tdSql.query(queryString)
totalRowsInserted = tdSql.getRows() totalRowsFromQuery = tdSql.getRows()
if totalConsumeRows != totalRowsInserted: tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsFromQuery))
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, totalRowsInserted)) if totalConsumeRows != totalRowsFromQuery:
tdLog.exit("tmq consume rows error!") tdLog.exit("tmq consume rows error!")
tdSql.query("drop topic %s"%topicFromStb1) tdSql.query("drop topic %s"%topicFromStb1)
...@@ -239,8 +243,8 @@ class TDTestCase: ...@@ -239,8 +243,8 @@ class TDTestCase:
def run(self): def run(self):
tdSql.prepare() tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1() # self.tmqCase1()
self.tmqCase2() self.tmqCase2()
def stop(self): def stop(self):
......
...@@ -33,7 +33,7 @@ python3 ./test.py -f 1-insert/create_retentions.py ...@@ -33,7 +33,7 @@ python3 ./test.py -f 1-insert/create_retentions.py
python3 ./test.py -f 1-insert/table_param_ttl.py python3 ./test.py -f 1-insert/table_param_ttl.py
python3 ./test.py -f 1-insert/update_data.py python3 ./test.py -f 1-insert/update_data.py
python3 ./test.py -f 1-insert/delete_data.py
python3 ./test.py -f 2-query/db.py python3 ./test.py -f 2-query/db.py
python3 ./test.py -f 2-query/between.py python3 ./test.py -f 2-query/between.py
...@@ -184,7 +184,7 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py ...@@ -184,7 +184,7 @@ python3 ./test.py -f 7-tmq/tmqConsFromTsdb-mutilVg-mutilCtb.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb-funcNFilter.py
python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py
python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py #python3 ./test.py -f 7-tmq/tmqAutoCreateTbl.py
#python3 ./test.py -f 7-tmq/tmqDnodeRestart.py #python3 ./test.py -f 7-tmq/tmqDnodeRestart.py
python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py python3 ./test.py -f 7-tmq/tmqUpdate-1ctb.py
python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py python3 ./test.py -f 7-tmq/tmqUpdate-multiCtb-snapshot0.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册