提交 65871c2d 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

......@@ -586,7 +586,7 @@ int32_t tdSTSRowNew(SArray *pArray, STSchema *pTSchema, STSRow **ppRow) {
ASSERT(pTColumn->colId == PRIMARYKEY_TIMESTAMP_COL_ID);
} else {
if (IS_VAR_DATA_TYPE(pTColumn->type)) {
if (pColVal) {
if (pColVal && !pColVal->isNone && !pColVal->isNull) {
varDataLen += (pColVal->value.nData + sizeof(VarDataLenT));
if (maxVarDataLen < (pColVal->value.nData + sizeof(VarDataLenT))) {
maxVarDataLen = pColVal->value.nData + sizeof(VarDataLenT);
......
......@@ -108,6 +108,10 @@ typedef struct {
// exec
STqExecHandle execHandle;
// prevent drop
int64_t ntbUid;
SArray* colIdList; // SArray<int32_t>
} STqHandle;
struct STQ {
......
......@@ -142,6 +142,7 @@ void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqCommit(STQ*);
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
int32_t tqCheckColModifiable(STQ* pTq, int32_t colId);
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen);
......
......@@ -208,6 +208,26 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) {
return 0;
}
int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) {
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->handles, pIter);
if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
int32_t sz = taosArrayGetSize(pExec->colIdList);
for (int32_t i = 0; i < sz; i++) {
int32_t forbidColId = *(int32_t*)taosArrayGet(pExec->colIdList, i);
if (forbidColId == colId) {
taosHashCancelIterate(pTq->handles, pIter);
return -1;
}
}
}
}
return 0;
}
static int32_t tqInitDataRsp(SMqDataRsp* pRsp, const SMqPollReq* pReq, int8_t subType) {
pRsp->reqOffset = pReq->reqOffset;
......@@ -752,8 +772,9 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
if (ppTask) {
SStreamTask* pTask = *ppTask;
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
atomic_store_8(&(*ppTask)->taskStatus, TASK_STATUS__DROPPING);
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
}
// todo
// clear queue
......
......@@ -3957,7 +3957,7 @@ static SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
doFilter(pIndefInfo->pCondition, pInfo->pRes);
size_t rows = pInfo->pRes->info.rows;
if (rows >= 0) {
if (rows > 0 || pOperator->status == OP_EXEC_DONE) {
break;
}
}
......
......@@ -1519,6 +1519,7 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
blockDataDestroy(pStreamScan->pPullDataRes);
blockDataDestroy(pStreamScan->pDeleteDataRes);
taosArrayDestroy(pStreamScan->pBlockLists);
taosArrayDestroy(pStreamScan->tsArray);
taosMemoryFree(pStreamScan);
}
......
......@@ -257,6 +257,22 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode);
int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader);
int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry);
// trace log
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s);
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s);
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s);
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s);
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s);
// for debug --------------
void syncNodePrint(SSyncNode* pObj);
void syncNodePrint2(char* s, SSyncNode* pObj);
......
......@@ -92,13 +92,10 @@
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0;
// print log
syncAppendEntriesLog2("==syncNodeOnAppendEntriesCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped");
return ret;
syncLogRecvAppendEntries(ths, pMsg, "maybe replica already dropped");
return -1;
}
// maybe update term
......@@ -114,17 +111,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
ASSERT(pMsg->dataLen >= 0);
do {
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower");
syncNodeBecomeFollower(ths, "from candidate by append entries");
// ret or reply?
return ret;
}
} while (0);
// return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
syncLogRecvAppendEntries(ths, pMsg, "candidate to follower");
syncNodeBecomeFollower(ths, "from candidate by append entries");
return -1; // ret or reply?
}
SyncTerm localPreLogTerm = 0;
if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) {
......@@ -148,13 +140,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// reject request
if ((pMsg->term < ths->pRaftStore->currentTerm) ||
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) {
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, reject, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
syncLogRecvAppendEntries(ths, pMsg, "reject");
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
......@@ -164,14 +150,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
pReply->matchIndex = SYNC_INDEX_INVALID;
// msg event log
do {
char host[128];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
} while (0);
syncLogSendAppendEntriesReply(ths, pReply, "");
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
......@@ -192,13 +171,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, accept, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
syncLogRecvAppendEntries(ths, pMsg, "accept");
if (hasExtraEntries && hasAppendEntries) {
// not conflict by default
......@@ -348,14 +321,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
}
// msg event log
do {
char host[128];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
} while (0);
syncLogSendAppendEntriesReply(ths, pReply, "");
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
......@@ -558,8 +524,8 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-batch, maybe replica already dropped");
return ret;
syncLogRecvAppendEntriesBatch(ths, pMsg, "maybe replica already dropped");
return -1;
}
// maybe update term
......@@ -582,15 +548,13 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
do {
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
if (condition) {
syncNodeEventLog(ths, "recv sync-append-entries-batch, candidate to follower");
syncLogRecvAppendEntriesBatch(ths, pMsg, "candidate to follower");
syncNodeBecomeFollower(ths, "from candidate by append entries");
// do not reply?
return ret;
return 0; // do not reply?
}
} while (0);
// fake match2
// fake match
//
// condition1:
// preIndex <= my commit index
......@@ -602,14 +566,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex);
if (condition) {
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch, fake match2, {pre-index:%" PRId64 ", pre-term:%" PRIu64
", datalen:%d, datacount:%d}",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
syncLogRecvAppendEntriesBatch(ths, pMsg, "fake match");
SyncIndex matchIndex = ths->commitIndex;
bool hasAppendEntries = pMsg->dataLen > 0;
......@@ -662,14 +619,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->matchIndex = matchIndex;
// msg event log
do {
char host[128];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
} while (0);
syncLogSendAppendEntriesReply(ths, pReply, "");
// send response
SRpcMsg rpcMsg;
......@@ -702,14 +652,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
bool condition = condition1 || condition2;
if (condition) {
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch, not match, {pre-index:%" PRId64 ", pre-term:%" PRIu64
", datalen:%d, datacount:%d}",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
syncLogRecvAppendEntriesBatch(ths, pMsg, "not match");
// maybe update commit index by snapshot
syncNodeMaybeUpdateCommitBySnapshot(ths);
......@@ -724,14 +667,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->matchIndex = ths->commitIndex;
// msg event log
do {
char host[128];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
} while (0);
syncLogSendAppendEntriesReply(ths, pReply, "");
// send response
SRpcMsg rpcMsg;
......@@ -762,14 +698,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
bool hasAppendEntries = pMsg->dataLen > 0;
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch, match, {pre-index:%" PRId64 ", pre-term:%" PRIu64
", datalen:%d, datacount:%d}",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen, pMsg->dataCount);
syncNodeEventLog(ths, logBuf);
} while (0);
syncLogRecvAppendEntriesBatch(ths, pMsg, "really match");
if (hasExtraEntries) {
// make log same, rollback deleted entries
......@@ -808,14 +737,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex;
// msg event log
do {
char host[128];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
} while (0);
syncLogSendAppendEntriesReply(ths, pReply, "");
// send response
SRpcMsg rpcMsg;
......@@ -866,13 +788,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
int32_t ret = 0;
int32_t code = 0;
// print log
syncAppendEntriesLog2("==syncNodeOnAppendEntriesSnapshotCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries, maybe replica already dropped");
return ret;
syncLogRecvAppendEntries(ths, pMsg, "maybe replica already dropped");
return -1;
}
// maybe update term
......@@ -895,11 +814,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
do {
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
if (condition) {
syncNodeEventLog(ths, "recv sync-append-entries, candidate to follower");
syncLogRecvAppendEntries(ths, pMsg, "candidate to follower");
syncNodeBecomeFollower(ths, "from candidate by append entries");
// do not reply?
return ret;
return 0; // do not reply?
}
} while (0);
......@@ -962,7 +879,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
} while (0);
#endif
// fake match2
// fake match
//
// condition1:
// preIndex <= my commit index
......@@ -975,13 +892,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex);
if (condition) {
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, fake match2, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
syncLogRecvAppendEntries(ths, pMsg, "fake match");
SyncIndex matchIndex = ths->commitIndex;
bool hasAppendEntries = pMsg->dataLen > 0;
......@@ -1027,14 +938,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->matchIndex = matchIndex;
// msg event log
do {
char host[128];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
} while (0);
syncLogSendAppendEntriesReply(ths, pReply, "");
// send response
SRpcMsg rpcMsg;
......@@ -1067,11 +971,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool condition = condition1 || condition2;
if (condition) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, not match, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
syncLogRecvAppendEntries(ths, pMsg, "not match");
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
......@@ -1083,14 +983,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->matchIndex = SYNC_INDEX_INVALID;
// msg event log
do {
char host[128];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
} while (0);
syncLogSendAppendEntriesReply(ths, pReply, "");
// send response
SRpcMsg rpcMsg;
......@@ -1120,11 +1013,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
char logBuf[128];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries, match, pre-index:%" PRId64 ", pre-term:%" PRIu64 ", datalen:%d",
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
syncLogRecvAppendEntries(ths, pMsg, "really match");
if (hasExtraEntries) {
// make log same, rollback deleted entries
......@@ -1159,14 +1048,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex;
// msg event log
do {
char host[128];
uint16_t port;
syncUtilU642Addr(pReply->destId.addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64
", success:%d, match-index:%" PRId64 "}",
ths->vgId, host, port, pReply->term, pReply->privateTerm, pReply->success, pReply->matchIndex);
} while (0);
syncLogSendAppendEntriesReply(ths, pReply, "");
// send response
SRpcMsg rpcMsg;
......
......@@ -40,44 +40,33 @@
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// print log
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplyCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
return 0;
syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped");
return -1;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response",
pMsg->term);
syncNodeEventLog(ths, logBuf);
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0;
}
if (gRaftDetailLog) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, before");
}
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term);
syncNodeErrorLog(ths, logBuf);
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
return -1;
}
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
......@@ -100,13 +89,16 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
}
if (gRaftDetailLog) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, after");
}
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);
SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
beforeMatchIndex, afterNextIndex, afterMatchIndex);
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
} while (0);
return ret;
return 0;
}
// only start once
......@@ -147,40 +139,29 @@ static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, Sync
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// print log
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, term:%lu, match:%ld, success:%d", pMsg->term,
pMsg->matchIndex, pMsg->success);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped");
return -1;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response",
pMsg->term);
syncNodeEventLog(ths, logBuf);
return -1;
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0;
}
// error term
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term);
syncNodeErrorLog(ths, logBuf);
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
return -1;
}
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->success) {
SyncIndex newNextIndex = pMsg->matchIndex + 1;
SyncIndex newMatchIndex = pMsg->matchIndex;
......@@ -293,50 +274,48 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
} while (0);
}
SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
beforeMatchIndex, afterNextIndex, afterMatchIndex);
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
} while (0);
return 0;
}
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
int32_t ret = 0;
// print log
syncAppendEntriesReplyLog2("==syncNodeOnAppendEntriesReplySnapshotCb==", pMsg);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
return 0;
syncLogRecvAppendEntriesReply(ths, pMsg, "maybe replica already dropped");
return -1;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%" PRIu64 ", drop stale response",
pMsg->term);
syncNodeEventLog(ths, logBuf);
syncLogRecvAppendEntriesReply(ths, pMsg, "drop stale response");
return 0;
}
if (gRaftDetailLog) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, before");
}
syncIndexMgrLog2("recv sync-append-entries-reply, before pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, before pMatchIndex:", ths->pMatchIndex);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%" PRIu64, pMsg->term);
syncNodeErrorLog(ths, logBuf);
syncLogRecvAppendEntriesReply(ths, pMsg, "error term");
return -1;
}
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
SyncIndex beforeNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex beforeMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
if (pMsg->success) {
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
......@@ -404,11 +383,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
}
}
if (gRaftDetailLog) {
syncNodeEventLog(ths, "recv sync-append-entries-reply, after");
}
syncIndexMgrLog2("recv sync-append-entries-reply, after pNextIndex:", ths->pNextIndex);
syncIndexMgrLog2("recv sync-append-entries-reply, after pMatchIndex:", ths->pMatchIndex);
SyncIndex afterNextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
SyncIndex afterMatchIndex = syncIndexMgrGetIndex(ths->pMatchIndex, &(pMsg->srcId));
do {
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "before next:%ld, match:%ld, after next:%ld, match:%ld", beforeNextIndex,
beforeMatchIndex, afterNextIndex, afterMatchIndex);
syncLogRecvAppendEntriesReply(ths, pMsg, logBuf);
} while (0);
return 0;
}
\ No newline at end of file
......@@ -120,18 +120,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg) {
int32_t ret = 0;
do {
char host[64];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "", host, port,
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(pSyncNode, logBuf);
} while (0);
syncLogSendRequestVote(pSyncNode, pMsg, "");
SRpcMsg rpcMsg;
syncRequestVote2RpcMsg(pMsg, &rpcMsg);
......
......@@ -79,8 +79,7 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaf
}
}
syncNodeLog3("syncIndexMgrGetIndex", pSyncIndexMgr->pSyncNode);
ASSERT(0);
return SYNC_INDEX_INVALID;
}
cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
......@@ -126,7 +125,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......
......@@ -1564,7 +1564,8 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
snprintf(logBuf, sizeof(logBuf), "%s", str);
}
// sDebug("%s", logBuf);
sInfo("%s", logBuf);
// sInfo("%s", logBuf);
sTrace("%s", logBuf);
} else {
int len = 256 + userStrLen;
......@@ -1586,7 +1587,8 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
snprintf(s, len, "%s", str);
}
// sDebug("%s", s);
sInfo("%s", s);
// sInfo("%s", s);
sTrace("%s", s);
taosMemoryFree(s);
}
......@@ -2923,4 +2925,126 @@ bool syncNodeCanChange(SSyncNode* pSyncNode) {
}
return true;
}
\ No newline at end of file
}
void syncLogSendRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-request-vote to %s:%d {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host, port,
pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvRequestVote(SSyncNode* pSyncNode, const SyncRequestVote* pMsg, const char* s) {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64 "}, %s", host,
port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "send sync-request-vote-reply to %s:%d {term:%" PRIu64 ", grant:%d}, %s", host, port,
pMsg->term, pMsg->voteGranted, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvRequestVoteReply(SSyncNode* pSyncNode, const SyncRequestVoteReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, %s", host,
port, pMsg->term, pMsg->voteGranted, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64
", "
"datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvAppendEntries(SSyncNode* pSyncNode, const SyncAppendEntries* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries from %s:%d {term:%" PRIu64 ", pre-index:%" PRIu64 ", pre-term:%" PRIu64
", commit:%" PRIu64 ", pterm:%" PRIu64
", "
"datalen:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->commitIndex, pMsg->privateTerm,
pMsg->dataLen, s);
syncNodeErrorLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, pMsg->dataCount, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvAppendEntriesBatch(SSyncNode* pSyncNode, const SyncAppendEntriesBatch* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-batch from %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, count:%d}, %s",
host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm, pMsg->commitIndex,
pMsg->dataLen, pMsg->dataCount, s);
syncNodeErrorLog(pSyncNode, logBuf);
}
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->destId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"send sync-append-entries-reply to %s:%d, {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
syncNodeEventLog(pSyncNode, logBuf);
}
void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-append-entries-reply from %s:%d {term:%" PRIu64 ", pterm:%" PRIu64 ", success:%d, match:%" PRId64
"}, %s",
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->matchIndex, s);
syncNodeErrorLog(pSyncNode, logBuf);
}
......@@ -108,10 +108,10 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON *pRoot = cJSON_CreateObject();
char u64Buf[128] = {0};
snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->currentTerm);
snprintf(u64Buf, sizeof(u64Buf), "" PRIu64 "", pRaftStore->currentTerm);
cJSON_AddStringToObject(pRoot, "current_term", u64Buf);
snprintf(u64Buf, sizeof(u64Buf), "%lu", pRaftStore->voteFor.addr);
snprintf(u64Buf, sizeof(u64Buf), "" PRIu64 "", pRaftStore->voteFor.addr);
cJSON_AddStringToObject(pRoot, "vote_for_addr", u64Buf);
cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId);
......@@ -142,11 +142,11 @@ int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) {
cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term");
ASSERT(cJSON_IsString(pCurrentTerm));
sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm));
sscanf(pCurrentTerm->valuestring, "" PRIu64 "", &(pRaftStore->currentTerm));
cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr");
ASSERT(cJSON_IsString(pVoteForAddr));
sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr));
sscanf(pVoteForAddr->valuestring, "" PRIu64 "", &(pRaftStore->voteFor.addr));
cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid");
pRaftStore->voteFor.vgId = pVoteForVgid->valueint;
......@@ -188,11 +188,11 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
cJSON *pRoot = cJSON_CreateObject();
if (pRaftStore != NULL) {
snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->currentTerm);
snprintf(u64buf, sizeof(u64buf), "" PRIu64 "", pRaftStore->currentTerm);
cJSON_AddStringToObject(pRoot, "currentTerm", u64buf);
cJSON *pVoteFor = cJSON_CreateObject();
snprintf(u64buf, sizeof(u64buf), "%lu", pRaftStore->voteFor.addr);
snprintf(u64buf, sizeof(u64buf), "" PRIu64 "", pRaftStore->voteFor.addr);
cJSON_AddStringToObject(pVoteFor, "addr", u64buf);
{
uint64_t u64 = pRaftStore->voteFor.addr;
......@@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char *raftStore2Str(SRaftStore *pRaftStore) {
cJSON *pJson = raftStore2Json(pRaftStore);
char * serialized = cJSON_Print(pJson);
char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson);
return serialized;
}
......@@ -224,25 +224,25 @@ char *raftStore2Str(SRaftStore *pRaftStore) {
// for debug -------------------
void raftStorePrint(SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
printf("raftStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
printf("raftStorePrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void raftStorePrint2(char *s, SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
printf("raftStorePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
printf("raftStorePrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
taosMemoryFree(serialized);
}
void raftStoreLog(SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
sTrace("raftStoreLog | len:%lu | %s", strlen(serialized), serialized);
sTrace("raftStoreLog | len:%" PRIu64 " | %s", strlen(serialized), serialized);
taosMemoryFree(serialized);
}
void raftStoreLog2(char *s, SRaftStore *pObj) {
char *serialized = raftStore2Str(pObj);
sTrace("raftStoreLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
sTrace("raftStoreLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized);
}
......@@ -313,18 +313,7 @@ int32_t syncNodeReplicate(SSyncNode* pSyncNode) {
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg) {
int32_t ret = 0;
do {
char host[128];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64 ", pre-term:%" PRIu64
", pterm:%" PRIu64 ", commit:%" PRId64
", "
"datalen:%d}",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen);
} while (0);
syncLogSendAppendEntries(pSyncNode, pMsg, "");
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
......@@ -334,15 +323,7 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId,
const SyncAppendEntriesBatch* pMsg) {
do {
char host[128];
uint16_t port;
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
sDebug("vgId:%d, send sync-append-entries-batch to %s:%d, {term:%" PRIu64 ", pre-index:%" PRId64
", pre-term:%" PRIu64 ", pterm:%" PRIu64 ", commit:%" PRId64 ", datalen:%d, datacount:%d}",
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount);
} while (0);
syncLogSendAppendEntriesBatch(pSyncNode, pMsg, "");
SRpcMsg rpcMsg;
syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg);
......
......@@ -47,18 +47,7 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, maybe replica already dropped",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(ths, logBuf);
} while (0);
syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped");
return -1;
}
......@@ -91,15 +80,10 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// trace log
do {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, reply-grant:%d",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
syncNodeEventLog(ths, logBuf);
char logBuf[32];
snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
syncLogRecvRequestVote(ths, pMsg, logBuf);
syncLogSendRequestVoteReply(ths, pReply, "");
} while (0);
SRpcMsg rpcMsg;
......@@ -212,18 +196,7 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
do {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, maybe replica already dropped",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm);
syncNodeEventLog(ths, logBuf);
} while (0);
syncLogRecvRequestVote(ths, pMsg, "maybe replica already dropped");
return -1;
}
......@@ -254,15 +227,10 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) {
// trace log
do {
char logBuf[256];
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote from %s:%d, {term:%" PRIu64 ", lindex:%" PRId64 ", lterm:%" PRIu64
"}, reply-grant:%d",
host, port, pMsg->term, pMsg->lastLogIndex, pMsg->lastLogTerm, pReply->voteGranted);
syncNodeEventLog(ths, logBuf);
char logBuf[32];
snprintf(logBuf, sizeof(logBuf), "grant:%d", pReply->voteGranted);
syncLogRecvRequestVote(ths, pMsg, logBuf);
syncLogSendRequestVoteReply(ths, pReply, "");
} while (0);
SRpcMsg rpcMsg;
......
......@@ -40,40 +40,15 @@
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
// trace log
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
port, pMsg->term, pMsg->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped");
return -1;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
return -1;
}
......@@ -84,16 +59,11 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
host, port, pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
return -1;
}
syncLogRecvRequestVoteReply(ths, pMsg, "");
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// This tallies votes even when the current state is not Candidate,
......@@ -185,40 +155,15 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) {
int32_t ret = 0;
// trace log
do {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d} ", host,
port, pMsg->term, pMsg->voteGranted);
syncNodeEventLog(ths, logBuf);
} while (0);
// if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, maybe replica dropped", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
syncLogRecvRequestVoteReply(ths, pMsg, "maybe replica already dropped");
return -1;
}
// drop stale response
if (pMsg->term < ths->pRaftStore->currentTerm) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, drop stale response", host, port,
pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
syncLogRecvRequestVoteReply(ths, pMsg, "drop stale response");
return -1;
}
......@@ -229,16 +174,11 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
// }
if (pMsg->term > ths->pRaftStore->currentTerm) {
char host[64];
uint16_t port;
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
char logBuf[256];
snprintf(logBuf, sizeof(logBuf), "recv sync-request-vote-reply from %s:%d {term:%" PRIu64 ", grant:%d}, error term",
host, port, pMsg->term, pMsg->voteGranted);
syncNodeErrorLog(ths, logBuf);
syncLogRecvRequestVoteReply(ths, pMsg, "error term");
return -1;
}
syncLogRecvRequestVoteReply(ths, pMsg, "");
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
// This tallies votes even when the current state is not Candidate,
......
......@@ -300,7 +300,7 @@ class TDDnode:
if self.valgrind == 0:
if platform.system().lower() == 'windows':
cmd = "mintty -h never -w hide %s -c %s" % (
cmd = "mintty -h never %s -c %s" % (
binPath, self.cfgDir)
else:
cmd = "nohup %s -c %s > /dev/null 2>&1 & " % (
......@@ -309,7 +309,7 @@ class TDDnode:
valgrindCmdline = "valgrind --log-file=\"%s/../log/valgrind.log\" --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"%self.cfgDir
if platform.system().lower() == 'windows':
cmd = "mintty -h never -w hide %s %s -c %s" % (
cmd = "mintty -h never %s %s -c %s" % (
valgrindCmdline, binPath, self.cfgDir)
else:
cmd = "nohup %s %s -c %s 2>&1 & " % (
......@@ -521,7 +521,7 @@ class TDDnode:
if self.running != 0:
if platform.system().lower() == 'windows':
psCmd = "for /f %a in ('wmic process where \"name='taosd.exe' and CommandLine like '%%dnode%d%%'\" get processId ^| xargs echo ^| awk ^'{print $2}^'') do @(ps | grep %a | awk '{print $1}' | xargs kill -INT )" % (self.index)
psCmd = "for /f %%a in ('wmic process where \"name='taosd.exe' and CommandLine like '%%dnode%d%%'\" get processId ^| xargs echo ^| awk ^'{print $2}^'') do @(ps | grep %%a | awk '{print $1}' | xargs kill -INT )" % (self.index)
else:
psCmd = "ps -ef|grep -w %s| grep dnode%d|grep -v grep | awk '{print $2}'" % (toBeKilled,self.index)
processID = subprocess.check_output(
......
......@@ -154,19 +154,19 @@
# ./test.sh -f tsim/parser/set_tag_vals.sim
# ./test.sh -f tsim/parser/single_row_in_tb.sim
# ./test.sh -f tsim/parser/sliding.sim
# ./test.sh -f tsim/parser/slimit_alter_tags.sim
# ./test.sh -f tsim/parser/slimit.sim
# ./test.sh -f tsim/parser/slimit1.sim
# ./test.sh -f tsim/parser/slimit_alter_tags.sim
# ./test.sh -f tsim/parser/stableOp.sim
./test.sh -f tsim/parser/stableOp.sim
# ./test.sh -f tsim/parser/tags_dynamically_specifiy.sim
# ./test.sh -f tsim/parser/tags_filter.sim
# ./test.sh -f tsim/parser/tbnameIn.sim
# ./test.sh -f tsim/parser/timestamp.sim
## ./test.sh -f tsim/parser/top_groupby.sim
# ./test.sh -f tsim/parser/topbot.sim
# ./test.sh -f tsim/parser/udf.sim
# ./test.sh -f tsim/parser/udf_dll.sim
# jira ./test.sh -f tsim/parser/tbnameIn.sim
./test.sh -f tsim/parser/timestamp.sim
./test.sh -f tsim/parser/top_groupby.sim
./test.sh -f tsim/parser/topbot.sim
# ./test.sh -f tsim/parser/udf_dll_stable.sim
# ./test.sh -f tsim/parser/udf_dll.sim
# ./test.sh -f tsim/parser/udf.sim
# ./test.sh -f tsim/parser/union.sim
# ./test.sh -f tsim/parser/where.sim
......
......@@ -476,26 +476,26 @@ if $rows != 100 then
return -1
endi
sql select sum(c2),c8,avg(c2), sum(c2)/count(*) from group_mt0 partition by c8 order by c8 slimit 2 soffset 99
sql select sum(c2),c8,avg(c2), sum(c2)/count(*) from group_mt0 partition by c8 slimit 2 soffset 99
if $rows != 1 then
return -1
endi
if $data00 != 79200.000000000 then
return -1
endi
#if $data00 != 79200.000000000 then
# return -1
#endi
if $data01 != @binary99@ then
return -1
endi
#if $data01 != @binary99@ then
# return -1
#endi
if $data02 != 99.000000000 then
return -1
endi
#if $data02 != 99.000000000 then
# return -1
#endi
if $data03 != 99.000000000 then
return -1
endi
#if $data03 != 99.000000000 then
# return -1
#endi
print ============>td-1765
sql select percentile(c4, 49),min(c4),max(c4),avg(c4),stddev(c4) from group_tb0 group by c8 order by c8;
......
......@@ -17,7 +17,7 @@ $db = $dbPrefix . $i
$stb = $stbPrefix . $i
sql drop database if exists $db
sql create database $db maxrows 200 cache 16
sql create database $db maxrows 200
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 binary(15), t2 int, t3 bigint, t4 nchar(10), t5 double, t6 bool)
......@@ -59,7 +59,7 @@ print ====== $db tables created
$db = $dbPrefix . 1
sql drop database if exists $db
sql create database $db maxrows 200 cache 16
sql create database $db maxrows 200
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 binary(15), t2 int, t3 bigint, t4 nchar(10), t5 double, t6 bool)
......@@ -93,11 +93,9 @@ run tsim/parser/slimit_query.sim
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 500
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 100
run tsim/parser/slimit_query.sim
......
......@@ -62,7 +62,7 @@ sql_error insert into $tb values (now, 1, 2.0);
sql alter stable $stb add tag tag2 int;
sql alter stable $stb change tag tag2 tag3;
sql alter stable $stb rename tag tag2 tag3;
sql_error drop stable $tb
......@@ -85,7 +85,7 @@ print create/alter/drop stable test passed
sql drop database $db
sql show databases
if $rows != 0 then
if $rows != 2 then
return -1
endi
......
......@@ -64,7 +64,6 @@ run tsim/parser/tbnameIn_query.sim
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 500
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
......
sleep 100
sql connect
$dbPrefix = ti_db
......@@ -27,10 +26,11 @@ sql use $db
sql select count(*) from $stb where tbname in ('ti_tb1', 'ti_tb300') and t1 > 2
# tbname in used on meter
sql_error select count(*) from $tb where tbname in ('ti_tb1', 'ti_tb300')
sql select count(*) from $tb where tbname in ('ti_tb1', 'ti_tb300')
## tbname in + group by tag
sql select count(*) from $stb where tbname in ('ti_tb1', 'ti_tb300') group by t1 order by t1 asc
print select count(*) from $stb where tbname in ('ti_tb1', 'ti_tb300') group by t1 order by t1 asc
sql select count(*), t1 from $stb where tbname in ('ti_tb1', 'ti_tb300') group by t1 order by t1 asc
if $rows != 2 then
return -1
endi
......@@ -48,7 +48,7 @@ if $data11 != 300 then
endi
## duplicated tbnames
sql select count(*) from $stb where tbname in ('ti_tb1', 'ti_tb1', 'ti_tb1', 'ti_tb2', 'ti_tb2', 'ti_tb3') group by t1 order by t1 asc
sql select count(*), t1 from $stb where tbname in ('ti_tb1', 'ti_tb1', 'ti_tb1', 'ti_tb2', 'ti_tb2', 'ti_tb3') group by t1 order by t1 asc
if $rows != 3 then
return -1
endi
......@@ -72,7 +72,7 @@ if $data21 != 3 then
endi
## wrong tbnames
sql select count(*) from $stb where tbname in ('tbname in', 'ti_tb1', 'ti_stb0') group by t1 order by t1
sql select count(*), t1 from $stb where tbname in ('tbname in', 'ti_tb1', 'ti_stb0') group by t1 order by t1
if $rows != 1 then
return -1
endi
......@@ -84,7 +84,7 @@ if $data01 != 1 then
endi
## tbname in + colummn filtering
sql select count(*) from $stb where tbname in ('tbname in', 'ti_tb1', 'ti_stb0', 'ti_tb2') and c8 like 'binary%' group by t1 order by t1 asc
sql select count(*), t1 from $stb where tbname in ('tbname in', 'ti_tb1', 'ti_stb0', 'ti_tb2') and c8 like 'binary%' group by t1 order by t1 asc
if $rows != 2 then
return -1
endi
......@@ -102,7 +102,8 @@ if $data11 != 2 then
endi
## tbname in can accpet Upper case table name
sql select count(*) from $stb where tbname in ('ti_tb0', 'TI_tb1', 'TI_TB2') group by t1 order by t1
print select count(*), t1 from $stb where tbname in ('ti_tb0', 'TI_tb1', 'TI_TB2') group by t1 order by t1
sql select count(*), t1 from $stb where tbname in ('ti_tb0', 'TI_tb1', 'TI_TB2') group by t1 order by t1
if $rows != 3 then
return -1
endi
......@@ -126,7 +127,7 @@ if $data21 != 2 then
endi
# multiple tbname in is not allowed NOW
sql_error select count(*) from $stb where tbname in ('ti_tb1', 'ti_tb300') and tbname in ('ti_tb5', 'ti_tb1000') group by t1 order by t1 asc
sql select count(*), t1 from $stb where tbname in ('ti_tb1', 'ti_tb300') and tbname in ('ti_tb5', 'ti_tb1000') group by t1 order by t1 asc
#if $rows != 4 then
# return -1
#endi
......
......@@ -54,10 +54,8 @@ run tsim/parser/timestamp_query.sim
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 100
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 100
run tsim/parser/timestamp_query.sim
sleep 100
sql connect
$dbPrefix = ts_db
......@@ -22,14 +21,14 @@ $tsu = $tsu - $delta
$tsu = $tsu + $ts0
print ==================>issue #3481, normal column not allowed,
sql_error select ts,c1,min(c2) from ts_stb0
sql select ts,c1,min(c2) from ts_stb0
print ==================>issue #4681, not equal operator on primary timestamp not allowed
sql_error select * from ts_stb0 where ts <> $ts0
sql select * from ts_stb0 where ts <> $ts0
##### select from supertable
$tb = $tbPrefix . 0
sql select first(c1), last(c1), (1537325400 - 1537146000)/(5*60) v from $tb where ts >= $ts0 and ts < $tsu interval(5m) fill(value, -1)
sql select _wstart, first(c1), last(c1), (1537325400 - 1537146000)/(5*60) v from $tb where ts >= $ts0 and ts < $tsu interval(5m) fill(value, -1)
$res = $rowNum * 2
$n = $res - 2
print ============>$n
......@@ -43,13 +42,12 @@ if $data03 != 598.000000000 then
return -1
endi
if $data13 != 598.000000000 then
print expect 598.000000000, actual $data03
return -1
endi
sql select first(c1), last(c1), (1537325400 - 1537146000)/(5*60) v from $tb where ts >= $ts0 and ts < $tsu interval(5m) fill(value, NULL)
sql select _wstart, first(c1), last(c1), (1537325400 - 1537146000)/(5*60) v from $tb where ts >= $ts0 and ts < $tsu interval(5m) fill(value, NULL)
if $data13 != 598.000000000 then
print expect 598.000000000, actual $data03
return -1
......
......@@ -20,7 +20,7 @@ $stb = $stbPrefix . $i
sql drop database $db -x step1
step1:
sql create database $db cache 16 maxrows 4096 keep 36500
sql create database $db maxrows 4096 keep 36500
print ====== create tables
sql use $db
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
......@@ -68,7 +68,7 @@ if $row != 100 then
return -1
endi
sql select bottom(c3, 5) from tb_tb1 interval(1y);
sql select _wstart, bottom(c3, 5) from tb_tb1 interval(1y);
if $rows != 5 then
return -1
endi
......@@ -90,7 +90,7 @@ if $data31 != 0.00000 then
return -1
endi
sql select top(c4, 5) from tb_tb1 interval(1y);
sql select _wstart, top(c4, 5) from tb_tb1 interval(1y);
if $rows != 5 then
return -1
endi
......@@ -112,7 +112,7 @@ if $data31 != 9.000000000 then
return -1
endi
sql select top(c3, 5) from tb_tb1 interval(40h)
sql select _wstart, top(c3, 5) from tb_tb1 interval(40h)
if $rows != 25 then
return -1
endi
......@@ -149,7 +149,7 @@ sql insert into test1 values(1537146000006, 7, 7, 7, 7, 6.100000, 6.100000, 0, '
sql insert into test1 values(1537146000007, 8, 8, 8, 8, 7.100000, 7.100000, 1, 'taosdata8', '涛思数据8');
sql insert into test1 values(1537146000008, 9, 9, 9, 9, 8.100000, 8.100000, 0, 'taosdata9', '涛思数据9');
sql insert into test1 values(1537146000009, 10, 10, 10, 10, 9.100000, 9.100000, 1, 'taosdata10', '涛思数据10');
sql select bottom(col5, 10) from test
sql select ts, bottom(col5, 10) from test order by col5;
if $rows != 10 then
return -1
endi
......@@ -177,13 +177,11 @@ sql insert into test values(29999, 1)(70000, 2)(80000, 3)
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
sleep 500
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sleep 100
sql select count(*) from t1.test where ts>10000 and ts<90000 interval(5000a)
sql select count(*) from t1.test where ts > 10000 and ts < 90000 interval(5000a)
if $rows != 3 then
return -1
endi
......@@ -218,7 +216,6 @@ endw
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
sql connect
sleep 100
sql use db;
$ts = 1000
......@@ -270,10 +267,9 @@ sql insert into t2 values('2020-2-2 1:1:1', 1);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
sql connect
sleep 100
sql use db
sql select count(*), first(ts), last(ts) from t2 interval(1d);
sql select _wstart, count(*), first(ts), last(ts) from t2 interval(1d);
if $rows != 2 then
return -1
endi
......@@ -367,9 +363,9 @@ if $row != 1 then
return -1
endi
sql_error select * from ttm2 where k=null
sql_error select * from ttm2 where k<>null
sql select * from ttm2 where k=null
sql select * from ttm2 where k<>null
sql_error select * from ttm2 where k like null
sql_error select * from ttm2 where k<null
sql select * from ttm2 where k<null
system sh/exec.sh -n dnode1 -s stop -x SIGINT
......@@ -48,6 +48,8 @@ class TDTestCase:
#!for bug
tdDnodes.stoptaosd(1)
sleep(self.delaytime)
if platform.system().lower() == 'windows':
sleep(10)
tdSql.error('select server_status()')
def run(self):
......
......@@ -290,6 +290,52 @@ class TDTestCase:
tdSql.checkData(0, 0, None)
tdSql.query("select last_row(c1) from testdb.stb1")
tdSql.checkData(0, 0, None)
# support regular query about last ,first ,last_row
tdSql.error("select last_row(c1,NULL) from testdb.t1")
tdSql.error("select last_row(NULL) from testdb.t1")
tdSql.error("select last(NULL) from testdb.t1")
tdSql.error("select first(NULL) from testdb.t1")
tdSql.query("select last_row(c1,123) from testdb.t1")
tdSql.checkData(0,0,None)
tdSql.checkData(0,1,123)
tdSql.query("select last_row(123) from testdb.t1")
tdSql.checkData(0,0,123)
tdSql.error("select last(c1,NULL) from testdb.t1")
tdSql.query("select last(c1,123) from testdb.t1")
tdSql.checkData(0,0,9)
tdSql.checkData(0,1,123)
tdSql.error("select first(c1,NULL) from testdb.t1")
tdSql.query("select first(c1,123) from testdb.t1")
tdSql.checkData(0,0,1)
tdSql.checkData(0,1,123)
tdSql.error("select last_row(c1,c2,c3,NULL,c4) from testdb.t1")
tdSql.query("select last_row(c1,c2,c3,123,c4) from testdb.t1")
tdSql.checkData(0,0,None)
tdSql.checkData(0,1,None)
tdSql.checkData(0,2,None)
tdSql.checkData(0,3,123)
tdSql.checkData(0,4,None)
tdSql.error("select last_row(c1,c2,c3,NULL,c4,t1,t2) from testdb.ct1")
tdSql.query("select last_row(c1,c2,c3,123,c4,t1,t2) from testdb.ct1")
tdSql.checkData(0,0,9)
tdSql.checkData(0,1,-99999)
tdSql.checkData(0,2,-999)
tdSql.checkData(0,3,123)
tdSql.checkData(0,4,None)
tdSql.checkData(0,5,0)
tdSql.checkData(0,5,0)
# # bug need fix
tdSql.query("select last_row(c1), c2, c3 , c4, c5 from testdb.t1")
......
......@@ -169,13 +169,13 @@ class TDTestCase:
tdSql.checkData(0,1,self.row_nums)
tdSql.query("select c1 , mavg(c1 ,2 ) from stb partition by c1")
tdSql.checkRows(72)
tdSql.checkRows(90)
tdSql.query("select c1 , diff(c1 , 0) from stb partition by c1")
tdSql.checkRows(72)
tdSql.checkRows(90)
tdSql.query("select c1 , csum(c1) from stb partition by c1")
tdSql.checkRows(80)
tdSql.checkRows(100)
tdSql.query("select c1 , sample(c1,2) from stb partition by c1 order by c1")
tdSql.checkRows(21)
......@@ -191,7 +191,7 @@ class TDTestCase:
tdSql.checkData(0,1,None)
tdSql.query("select c1 , DERIVATIVE(c1,2,1) from stb partition by c1 order by c1")
tdSql.checkRows(72)
tdSql.checkRows(90)
# bug need fix
# tdSql.checkData(0,1,None)
......
from distutils.log import error
import taos
import sys
import time
import socket
import os
import threading
import subprocess
import platform
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.snapshot = 0
self.vgroups = 4
self.ctbNum = 100
self.rowsPerTbl = 1000
def init(self, conn, logSql):
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def prepare_udf_so(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
print(projPath)
if platform.system().lower() == 'windows':
self.libudf1 = subprocess.Popen('(for /r %s %%i in ("udf1.d*") do @echo %%i)|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
if (not tdDnodes.dnodes[0].remoteIP == ""):
tdDnodes.dnodes[0].remote_conn.get(tdDnodes.dnodes[0].config["path"]+'/debug/build/lib/libudf1.so',projPath+"\\debug\\build\\lib\\")
self.libudf1 = self.libudf1.replace('udf1.dll','libudf1.so')
else:
self.libudf1 = subprocess.Popen('find %s -name "libudf1.so"|grep lib|head -n1'%projPath , shell=True, stdout=subprocess.PIPE,stderr=subprocess.STDOUT).stdout.read().decode("utf-8")
self.libudf1 = self.libudf1.replace('\r','').replace('\n','')
return
def create_udf_function(self):
# create scalar functions
tdSql.execute("create function udf1 as '%s' outputtype int bufSize 8;"%self.libudf1)
functions = tdSql.getResult("show functions")
function_nums = len(functions)
if function_nums == 1:
tdLog.info("create one udf functions success ")
else:
tdLog.exit("create udf functions fail")
return
def checkFileContent(self, consumerId, queryString):
buildPath = tdCom.getBuildPath()
cfgPath = tdCom.getClientCfgPath()
dstFile = '%s/../log/dstrows_%d.txt'%(cfgPath, consumerId)
cmdStr = '%s/build/bin/taos -c %s -s "%s >> %s"'%(buildPath, cfgPath, queryString, dstFile)
tdLog.info(cmdStr)
os.system(cmdStr)
consumeRowsFile = '%s/../log/consumerid_%d.txt'%(cfgPath, consumerId)
tdLog.info("rows file: %s, %s"%(consumeRowsFile, dstFile))
consumeFile = open(consumeRowsFile, mode='r')
queryFile = open(dstFile, mode='r')
# skip first line for it is schema
queryFile.readline()
while True:
dst = queryFile.readline()
src = consumeFile.readline()
if dst:
if dst != src:
tdLog.exit("consumerId %d consume rows is not match the rows by direct query"%consumerId)
else:
break
return
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
# tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: multi sub table")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1', 'topic2']
expectRowsList = []
tmqCom.initConsumerTable()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
# tdLog.info("create stb")
# tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
# tdLog.info("create ctb")
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# tdLog.info("insert data")
# tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("0 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
queryString = "select ts, c1,udf1(c1),sin(udf1(c2)), log(udf1(c2)) from %s.%s where udf1(c1) == 88 or sin(udf1(c1)) > 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 1
topicList = topicNameList[1]
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("1 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def tmqCase2(self):
tdLog.printNoPrefix("======== test case 2: multi sub table, consume with auto create tble and insert data")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 4,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 100,
'rowsPerTbl': 1000,
'batchNum': 100,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['snapshot'] = self.snapshot
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
topicNameList = ['topic1', 'topic2']
expectRowsList = []
tmqCom.initConsumerTable()
# tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
# tdLog.info("create stb")
# tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
# tdLog.info("create ctb")
# tdCom.create_ctable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"],tag_elm_list=paraDict['tagSchema'],count=paraDict["ctbNum"], default_ctbname_prefix=paraDict['ctbPrefix'])
# tdLog.info("insert data")
# tmqCom.insert_data_1(tdSql,paraDict["dbName"],paraDict["ctbPrefix"],paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"],paraDict["startTs"])
tdLog.info("create topics from stb with filter")
queryString = "select ts,c1,udf1(c1),c2,udf1(c2) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 2
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl)
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
if expectRowsList[0] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("2 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
# reinit consume info, and start tmq_sim, then check consume result
tmqCom.initConsumerTable()
queryString = "select ts, c1,udf1(c1),sin(udf1(c2)), log(udf1(c2)) from %s.%s where udf1(c1) == 88 or sin(udf1(c1)) > 0" %(paraDict['dbName'], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
tdSql.query(queryString)
expectRowsList.append(tdSql.getRows())
consumerId = 3
topicList = topicNameList[1]
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectRowsList[1] != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[1], resultList[0]))
tdLog.exit("3 tmq consume rows error!")
# self.checkFileContent(consumerId, queryString)
# tdLog.printNoPrefix("consumerId %d check data ok!"%(consumerId))
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
# tdSql.prepare()
self.prepare_udf_so()
self.create_udf_function()
tdLog.printNoPrefix("=============================================")
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
self.prepareTestEnv()
self.tmqCase1()
self.tmqCase2()
tdLog.printNoPrefix("====================================================================")
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
self.prepareTestEnv()
self.snapshot = 1
self.tmqCase1()
self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
Subproject commit d8cf0e7e067d193cfaf3e920b6ec6cbb9b9f4165
Subproject commit fa2d82918353a3b56e40838572120c1a4ece644c
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册