提交 0129d8ee 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into refact/submit_req

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 2aac500 GIT_TAG 9f587e9
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -162,7 +162,10 @@ typedef struct SSyncFSM { ...@@ -162,7 +162,10 @@ typedef struct SSyncFSM {
// SWal implements it // SWal implements it
typedef struct SSyncLogStore { typedef struct SSyncLogStore {
SLRUCache* pCache; SLRUCache* pCache;
void* data; int32_t cacheHit;
int32_t cacheMiss;
void* data;
int32_t (*syncLogUpdateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); int32_t (*syncLogUpdateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index);
SyncIndex (*syncLogCommitIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogCommitIndex)(struct SSyncLogStore* pLogStore);
......
...@@ -43,12 +43,12 @@ extern "C" { ...@@ -43,12 +43,12 @@ extern "C" {
#include <sys/utsname.h> #include <sys/utsname.h>
#include <sys/wait.h> #include <sys/wait.h>
#include <termios.h> #include <termios.h>
#include <cpuid.h>
#if defined(DARWIN) #if defined(DARWIN)
#else #else
#include <argp.h> #include <argp.h>
#include <sys/prctl.h> #include <sys/prctl.h>
#include <cpuid.h>
#endif #endif
#else #else
......
...@@ -2334,7 +2334,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) { ...@@ -2334,7 +2334,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
pColInfoData->info.type = *(int16_t*)pStart; pColInfoData->info.type = *(int8_t*)pStart;
pStart += sizeof(int8_t); pStart += sizeof(int8_t);
pColInfoData->info.bytes = *(int32_t*)pStart; pColInfoData->info.bytes = *(int32_t*)pStart;
......
...@@ -665,18 +665,23 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p ...@@ -665,18 +665,23 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
if (i < pVgroup->replica) { if (i < pVgroup->replica) {
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false); colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false);
bool exist = false;
bool online = false; bool online = false;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId); SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
if (pDnode != NULL) { if (pDnode != NULL) {
exist = true;
online = mndIsDnodeOnline(pDnode, curMs); online = mndIsDnodeOnline(pDnode, curMs);
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
} }
char buf1[20] = {0}; char buf1[20] = {0};
char role[20] = "offline"; char role[20] = "offline";
if (online) { if (!exist) {
strcpy(role, "dropping");
} else if (online) {
bool show = (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER && !pVgroup->vnodeGid[i].syncRestore); bool show = (pVgroup->vnodeGid[i].syncState == TAOS_SYNC_STATE_LEADER && !pVgroup->vnodeGid[i].syncRestore);
snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), show ? "*" : ""); snprintf(role, sizeof(role), "%s%s", syncStr(pVgroup->vnodeGid[i].syncState), show ? "*" : "");
} else {
} }
STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
......
...@@ -918,8 +918,11 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS ...@@ -918,8 +918,11 @@ void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillS
return; return;
} }
TSKEY realStart = taosTimeAdd(pFillSup->prev.key, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
pFillSup->interval.precision);
pFillInfo->needFill = true; pFillInfo->needFill = true;
pFillInfo->start = start; pFillInfo->start = realStart;
pFillInfo->current = pFillInfo->start; pFillInfo->current = pFillInfo->start;
pFillInfo->end = end; pFillInfo->end = end;
pFillInfo->pos = FILL_POS_INVALID; pFillInfo->pos = FILL_POS_INVALID;
...@@ -1418,9 +1421,13 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { ...@@ -1418,9 +1421,13 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) {
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextLen); code = streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextLen);
} }
// ts will be deleted later
if (delTs != ts) { if (delTs != ts) {
streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &delKey); streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &delKey);
streamStateFreeCur(pCur);
pCur = streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &nextKey);
} }
endTs = nextKey.ts - 1;
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
break; break;
} }
......
...@@ -580,15 +580,19 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, ...@@ -580,15 +580,19 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
} }
// set the output // set the output
if (TSDB_CODE_SUCCESS == code && NULL != pOutputGroupKeys) {
code = createColumnByRewriteExprs(pOutputGroupKeys, &pAgg->node.pTargets);
}
nodesDestroyList(pOutputGroupKeys);
if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) { if (TSDB_CODE_SUCCESS == code && NULL != pAgg->pAggFuncs) {
code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets); code = createColumnByRewriteExprs(pAgg->pAggFuncs, &pAgg->node.pTargets);
} }
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pOutputGroupKeys) {
code = createColumnByRewriteExprs(pOutputGroupKeys, &pAgg->node.pTargets);
} else if (NULL == pAgg->node.pTargets && NULL != pAgg->pGroupKeys) {
code = createColumnByRewriteExprs(pAgg->pGroupKeys, &pAgg->node.pTargets);
}
}
nodesDestroyList(pOutputGroupKeys);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pAgg; *pLogicNode = (SLogicNode*)pAgg;
} else { } else {
......
...@@ -192,6 +192,10 @@ typedef struct SSyncNode { ...@@ -192,6 +192,10 @@ typedef struct SSyncNode {
int64_t leaderTime; int64_t leaderTime;
int64_t lastReplicateTime; int64_t lastReplicateTime;
int32_t electNum;
int32_t becomeLeaderNum;
int32_t configChangeNum;
bool isStart; bool isStart;
} SSyncNode; } SSyncNode;
......
...@@ -205,9 +205,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -205,9 +205,11 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
pLocalEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, hLocal); pLocalEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, hLocal);
code = 0; code = 0;
ths->pLogStore->cacheHit++;
sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", appendIndex, pLocalEntry->bytes, pLocalEntry); sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", appendIndex, pLocalEntry->bytes, pLocalEntry);
} else { } else {
ths->pLogStore->cacheMiss++;
sNTrace(ths, "miss cache index:%" PRId64, appendIndex); sNTrace(ths, "miss cache index:%" PRId64, appendIndex);
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry); code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, appendIndex, &pLocalEntry);
......
...@@ -117,9 +117,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -117,9 +117,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
if (h) { if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
pSyncNode->pLogStore->cacheHit++;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", index, pEntry->bytes, pEntry); sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", index, pEntry->bytes, pEntry);
} else { } else {
pSyncNode->pLogStore->cacheMiss++;
sNTrace(pSyncNode, "miss cache index:%" PRId64, index); sNTrace(pSyncNode, "miss cache index:%" PRId64, index);
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry); int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
......
...@@ -61,7 +61,8 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) { ...@@ -61,7 +61,8 @@ static int32_t syncNodeRequestVotePeers(SSyncNode* pNode) {
} }
int32_t syncNodeElect(SSyncNode* pSyncNode) { int32_t syncNodeElect(SSyncNode* pSyncNode) {
sNTrace(pSyncNode, "begin election"); sNInfo(pSyncNode, "begin election");
pSyncNode->electNum++;
int32_t ret = 0; int32_t ret = 0;
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
...@@ -86,7 +87,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -86,7 +87,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeCandidate2Leader(pSyncNode); syncNodeCandidate2Leader(pSyncNode);
pSyncNode->pVotesGranted->toLeader = true; pSyncNode->pVotesGranted->toLeader = true;
return ret; return ret;
} }
if (pSyncNode->replicaNum == 1) { if (pSyncNode->replicaNum == 1) {
// only myself, to leader // only myself, to leader
...@@ -98,7 +99,6 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ...@@ -98,7 +99,6 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
syncNodeCandidate2Leader(pSyncNode); syncNodeCandidate2Leader(pSyncNode);
pSyncNode->pVotesGranted->toLeader = true; pSyncNode->pVotesGranted->toLeader = true;
return ret; return ret;
} }
ret = syncNodeRequestVotePeers(pSyncNode); ret = syncNodeRequestVotePeers(pSyncNode);
......
...@@ -410,9 +410,11 @@ bool syncIsReadyForRead(int64_t rid) { ...@@ -410,9 +410,11 @@ bool syncIsReadyForRead(int64_t rid) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
code = 0; code = 0;
pSyncNode->pLogStore->cacheHit++;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry); sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
} else { } else {
pSyncNode->pLogStore->cacheMiss++;
sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex); sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry); code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
...@@ -1008,6 +1010,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -1008,6 +1010,10 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID); atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID);
pSyncNode->isStart = true; pSyncNode->isStart = true;
pSyncNode->electNum = 0;
pSyncNode->becomeLeaderNum = 0;
pSyncNode->configChangeNum = 0;
sNTrace(pSyncNode, "sync open, node:%p", pSyncNode); sNTrace(pSyncNode, "sync open, node:%p", pSyncNode);
return pSyncNode; return pSyncNode;
...@@ -1340,6 +1346,8 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde ...@@ -1340,6 +1346,8 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
pSyncNode->pRaftCfg->cfg = *pNewConfig; pSyncNode->pRaftCfg->cfg = *pNewConfig;
pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex; pSyncNode->pRaftCfg->lastConfigIndex = lastConfigChangeIndex;
pSyncNode->configChangeNum++;
bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig); bool IamInOld = syncNodeInConfig(pSyncNode, &oldConfig);
bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig); bool IamInNew = syncNodeInConfig(pSyncNode, pNewConfig);
...@@ -1363,7 +1371,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde ...@@ -1363,7 +1371,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
char newCfgStr[1024] = {0}; char newCfgStr[1024] = {0};
syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr)); syncCfg2SimpleStr(&oldConfig, oldCfgStr, sizeof(oldCfgStr));
syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr)); syncCfg2SimpleStr(pNewConfig, oldCfgStr, sizeof(oldCfgStr));
sNTrace(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr); sNInfo(pSyncNode, "begin do config change, from %s to %s", oldCfgStr, oldCfgStr);
if (IamInNew) { if (IamInNew) {
pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal pSyncNode->pRaftCfg->isStandBy = 0; // change isStandBy to normal
...@@ -1495,13 +1503,13 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde ...@@ -1495,13 +1503,13 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
} else { } else {
// persist cfg // persist cfg
raftCfgPersist(pSyncNode->pRaftCfg); raftCfgPersist(pSyncNode->pRaftCfg);
sNTrace(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s --> %s", oldConfig.replicaNum, sNInfo(pSyncNode, "do not config change from %d to %d, index:%" PRId64 ", %s --> %s", oldConfig.replicaNum,
pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr); pNewConfig->replicaNum, lastConfigChangeIndex, oldCfgStr, newCfgStr);
} }
_END: _END:
// log end config change // log end config change
sNTrace(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr); sNInfo(pSyncNode, "end do config change, from %s to %s", oldCfgStr, newCfgStr);
} }
// raft state change -------------- // raft state change --------------
...@@ -1598,6 +1606,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1598,6 +1606,8 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->leaderTime = taosGetTimestampMs(); pSyncNode->leaderTime = taosGetTimestampMs();
pSyncNode->becomeLeaderNum++;
// reset restoreFinish // reset restoreFinish
pSyncNode->restoreFinish = false; pSyncNode->restoreFinish = false;
...@@ -1666,7 +1676,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1666,7 +1676,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
pSyncNode->minMatchIndex = SYNC_INDEX_INVALID; pSyncNode->minMatchIndex = SYNC_INDEX_INVALID;
// trace log // trace log
sNTrace(pSyncNode, "become leader %s", debugStr); sNInfo(pSyncNode, "become leader %s", debugStr);
} }
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {
...@@ -1842,9 +1852,11 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { ...@@ -1842,9 +1852,11 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); pPreEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
code = 0; code = 0;
pSyncNode->pLogStore->cacheHit++;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry); sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", preIndex, pPreEntry->bytes, pPreEntry);
} else { } else {
pSyncNode->pLogStore->cacheMiss++;
sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex); sNTrace(pSyncNode, "miss cache index:%" PRId64, preIndex);
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry); code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
...@@ -1971,7 +1983,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { ...@@ -1971,7 +1983,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
return; return;
} }
sTrace("enqueue heartbeat timer"); sTrace("vgId:%d, enqueue heartbeat timer", pNode->vgId);
code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg); code = pNode->syncEqMsg(pNode->msgcb, &rpcMsg);
if (code != 0) { if (code != 0) {
sError("failed to enqueue heartbeat msg since %s", terrstr()); sError("failed to enqueue heartbeat msg since %s", terrstr());
...@@ -2526,9 +2538,11 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde ...@@ -2526,9 +2538,11 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
if (h) { if (h) {
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
ths->pLogStore->cacheHit++;
sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry); sNTrace(ths, "hit cache index:%" PRId64 ", bytes:%u, %p", i, pEntry->bytes, pEntry);
} else { } else {
ths->pLogStore->cacheMiss++;
sNTrace(ths, "miss cache index:%" PRId64, i); sNTrace(ths, "miss cache index:%" PRId64, i);
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry); code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
......
...@@ -45,6 +45,9 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { ...@@ -45,6 +45,9 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
return NULL; return NULL;
} }
pLogStore->cacheHit = 0;
pLogStore->cacheMiss = 0;
taosLRUCacheSetStrictCapacity(pLogStore->pCache, false); taosLRUCacheSetStrictCapacity(pLogStore->pCache, false);
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
......
...@@ -80,9 +80,11 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh ...@@ -80,9 +80,11 @@ int32_t syncNodeReplicateOne(SSyncNode* pSyncNode, SRaftId* pDestId, bool snapsh
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h); pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
code = 0; code = 0;
pSyncNode->pLogStore->cacheHit++;
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", nextIndex, pEntry->bytes, pEntry); sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", nextIndex, pEntry->bytes, pEntry);
} else { } else {
pSyncNode->pLogStore->cacheMiss++;
sNTrace(pSyncNode, "miss cache index:%" PRId64, nextIndex); sNTrace(pSyncNode, "miss cache index:%" PRId64, nextIndex);
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry); code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
......
...@@ -52,7 +52,7 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) { ...@@ -52,7 +52,7 @@ static void syncNodeCleanConfigIndex(SSyncNode* ths) {
} }
static int32_t syncNodeTimerRoutine(SSyncNode* ths) { static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
sNTrace(ths, "timer routines"); sNInfo(ths, "timer routines");
// timer replicate // timer replicate
syncNodeReplicate(ths); syncNodeReplicate(ths);
......
...@@ -242,6 +242,9 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -242,6 +242,9 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
} }
int32_t cacheHit = pNode->pLogStore->cacheHit;
int32_t cacheMiss = pNode->pLogStore->cacheMiss;
char cfgStr[1024]; char cfgStr[1024];
if (pNode->pRaftCfg != NULL) { if (pNode->pRaftCfg != NULL) {
syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr)); syncCfg2SimpleStr(&(pNode->pRaftCfg->cfg), cfgStr, sizeof(cfgStr));
...@@ -275,18 +278,18 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -275,18 +278,18 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
terrno = errCode; terrno = errCode;
if (pNode != NULL && pNode->pRaftCfg != NULL) { if (pNode != NULL && pNode->pRaftCfg != NULL) {
taosPrintLog(flags, level, dflag, taosPrintLog(
"vgId:%d, sync %s " flags, level, dflag,
"%s" "vgId:%d, sync %s "
", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 "%s"
", snap-tm:%" PRIu64 ", sby:%d, aq:%d, snaping:%" PRId64 ", r-num:%d, lcfg:%" PRId64 ", tm:%" PRIu64 ", cmt:%" PRId64 ", fst:%" PRId64 ", lst:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64
", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s", ", snap-tm:%" PRIu64 ", elt-num:%d, bl-num:%d, cc-num:%d, hit:%d, mis:%d, aq:%d, snaping:%" PRId64
pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, ", r-num:%d, lcfg:%" PRId64 ", chging:%d, rsto:%d, dquorum:%d, elt:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s",
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->vgId, syncStr(pNode->state), eventLog, currentTerm, pNode->commitIndex, logBeginIndex, logLastIndex,
pNode->pRaftCfg->isStandBy, aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum,
pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->configChangeNum, cacheHit, cacheMiss, aqItems, pNode->snapshottingIndex, pNode->replicaNum,
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr, pNode->pRaftCfg->lastConfigIndex, pNode->changing, pNode->restoreFinish, quorum, pNode->electTimerLogicClock,
hbrTimeStr); pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr, hbrTimeStr);
} }
} }
...@@ -438,7 +441,8 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries ...@@ -438,7 +441,8 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s); host, port, pMsg->term, pMsg->privateTerm, pMsg->success, pMsg->lastSendIndex, pMsg->matchIndex, s);
} }
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed, int64_t execTime) { void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed,
int64_t execTime) {
if (!(sDebugFlag & DEBUG_TRACE)) return; if (!(sDebugFlag & DEBUG_TRACE)) return;
char host[64]; char host[64];
......
...@@ -417,7 +417,7 @@ ...@@ -417,7 +417,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_control.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_control.py
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
,,,system-test,python3 ./test.py -f 1-insert/alter_database.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_database.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/influxdb_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_telnet_line_taosc_insert.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/opentsdb_json_taosc_insert.py
...@@ -425,7 +425,7 @@ ...@@ -425,7 +425,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/test_stmt_set_tbname_tag.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_stable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_table.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/alter_table.py
,,,system-test,python3 ./test.py -f 1-insert/boundary.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/boundary.py
,,n,system-test,python3 ./test.py -f 1-insert/insertWithMoreVgroup.py ,,n,system-test,python3 ./test.py -f 1-insert/insertWithMoreVgroup.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_comment.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/table_comment.py
,,n,system-test,python3 ./test.py -f 1-insert/time_range_wise.py ,,n,system-test,python3 ./test.py -f 1-insert/time_range_wise.py
...@@ -612,7 +612,7 @@ ...@@ -612,7 +612,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -R
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/update_data.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/update_data.py
,,,system-test,python3 ./test.py -f 1-insert/tb_100w_data_order.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/tb_100w_data_order.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_childtable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_childtable.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_normaltable.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_normaltable.py
...@@ -658,7 +658,7 @@ ...@@ -658,7 +658,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3
,,,system-test,python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 5 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 5 -M 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeStopFollowerLeader.py -N 5 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeStopFollowerLeader.py -N 5 -M 3
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeStop2Follower.py -N 5 -M 3
...@@ -1016,6 +1016,7 @@ ...@@ -1016,6 +1016,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/insert_select.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 99-TDcase/TD-20582.py
#develop test #develop test
,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py ,,n,develop-test,python3 ./test.py -f 5-taos-tools/taosbenchmark/auto_create_table_json.py
......
...@@ -344,14 +344,176 @@ endi ...@@ -344,14 +344,176 @@ endi
sql drop stream if exists streams11;
sql drop stream if exists streams12;
sql drop stream if exists streams13;
sql drop stream if exists streams14;
sql drop stream if exists streams15;
sql drop database if exists test7;
sql create database test7 vgroups 1;
sql use test7;
sql create table t1(ts timestamp, a int, b int , c int, d double, s varchar(20));
sql create stream streams11 trigger at_once into streamt11 as select _wstart as ts, avg(a), count(*), timezone(), to_iso8601(1) from t1 where ts >= 1648791210000 and ts < 1648791240000 interval(1s) fill(NULL);
sql create stream streams12 trigger at_once into streamt12 as select _wstart as ts, avg(a), count(*), timezone(), to_iso8601(1) from t1 where ts >= 1648791210000 and ts < 1648791240000 interval(1s) fill(value,100.0,200);
sql create stream streams13 trigger at_once into streamt13 as select _wstart as ts, avg(a), count(*), timezone(), to_iso8601(1) from t1 where ts >= 1648791210000 and ts < 1648791240000 interval(1s) fill(next);
sql create stream streams14 trigger at_once into streamt14 as select _wstart as ts, avg(a), count(*), timezone(), to_iso8601(1) from t1 where ts >= 1648791210000 and ts < 1648791240000 interval(1s) fill(prev);
sql create stream streams15 trigger at_once into streamt15 as select _wstart as ts, avg(a), count(*), timezone(), to_iso8601(1) from t1 where ts >= 1648791210000 and ts < 1648791240000 interval(1s) fill(linear);
sql insert into t1 values(1648791210000,1,1,1,1.0,'aaa');
sql insert into t1 values(1648791210001,1,1,1,1.0,'aaa');
sql insert into t1 values(1648791215000,2,2,2,2.0,'bbb');
sql insert into t1 values(1648791220000,3,3,3,3.0,'ccc');
sql insert into t1 values(1648791225000,4,4,4,4.0,'fff');
sql insert into t1 values(1648791230000,5,5,5,5.0,'ddd');
sql insert into t1 values(1648791230001,6,6,6,6.0,'eee');
sql insert into t1 values(1648791230002,7,7,7,7.0,'fff');
$loop_count = 0
loop7:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamt11 order by ts;
if $rows != 21 then
print ====streamt11=rows3=$rows
goto loop7
endi
sql select * from streamt12 order by ts;
if $rows != 21 then
print ====streamt12=rows3=$rows
goto loop7
endi
sql select * from streamt13 order by ts;
if $rows != 21 then
print ====streamt13=rows3=$rows
goto loop7
endi
sql select * from streamt14 order by ts;
if $rows != 21 then
print ====streamt14=rows3=$rows
goto loop7
endi
sql select * from streamt15 order by ts;
if $rows != 21 then
print ====streamt15=rows3=$rows
goto loop7
endi
sql delete from t1 where ts > 1648791210001 and ts < 1648791230000;
$loop_count = 0
loop8:
sleep 200
$loop_count = $loop_count + 1
if $loop_count == 20 then
return -1
endi
sql select * from streamt11 order by ts;
if $rows != 21 then
print ====streamt11=rows3=$rows
goto loop8
endi
if $data12 != NULL then
print ====streamt11=3=data01=$data01
goto loop8
endi
if $data[19][2] != NULL then
print ====streamt11=3=data[19][2]=$data[19][2]
goto loop8
endi
sql select * from streamt12 order by ts;
if $rows != 21 then
print ====streamt12=rows3=$rows
goto loop8
endi
if $data12 != 200 then
print ====streamt12=3=data12=$data12
goto loop8
endi
if $data[19][2] != 200 then
print ====streamt12=3=data[19][2]=$data[19][2]
goto loop8
endi
sql select * from streamt13 order by ts;
if $rows != 21 then
print ====streamt13=rows3=$rows
goto loop8
endi
if $data12 != 3 then
print ====streamt13=3=data12=$data12
goto loop8
endi
if $data[19][2] != 3 then
print ====streamt13=3=data[19][2]=$data[19][2]
goto loop8
endi
sql select * from streamt14 order by ts;
if $rows != 21 then
print ====streamt14=rows3=$rows
goto loop8
endi
if $data12 != 2 then
print ====streamt14=3=data12=$data12
goto loop8
endi
if $data[19][2] != 2 then
print ====streamt14=3=data[19][2]=$data[19][2]
goto loop8
endi
sql select * from streamt15 order by ts;
if $rows != 21 then
print ====streamt15=rows3=$rows
goto loop8
endi
if $data12 != 2 then
print ====streamt15=3=data12=$data12
goto loop8
endi
if $data[19][2] != 2 then
print ====streamt15=3=data[19][2]=$data[19][2]
goto loop8
endi
...@@ -384,6 +546,11 @@ sql drop stream if exists streams7; ...@@ -384,6 +546,11 @@ sql drop stream if exists streams7;
sql drop stream if exists streams8; sql drop stream if exists streams8;
sql drop stream if exists streams9; sql drop stream if exists streams9;
sql drop stream if exists streams10; sql drop stream if exists streams10;
sql drop stream if exists streams11;
sql drop stream if exists streams12;
sql drop stream if exists streams13;
sql drop stream if exists streams14;
sql drop stream if exists streams15;
sql use test1; sql use test1;
sql select * from t1; sql select * from t1;
......
import taos
import sys
import datetime
import inspect
import math
from util.log import *
from util.sql import *
from util.cases import *
class TDTestCase:
# updatecfgDict = {'debugFlag': 143 ,"cDebugFlag":143,"uDebugFlag":143 ,"rpcDebugFlag":143 , "tmrDebugFlag":143 ,
# "jniDebugFlag":143 ,"simDebugFlag":143,"dDebugFlag":143, "dDebugFlag":143,"vDebugFlag":143,"mDebugFlag":143,"qDebugFlag":143,
# "wDebugFlag":143,"sDebugFlag":143,"tsdbDebugFlag":143,"tqDebugFlag":143 ,"fsDebugFlag":143 ,"udfDebugFlag":143}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def prepare_datas(self, dbname="db"):
tdSql.execute(
f''' CREATE TABLE ac_stb (TS TIMESTAMP, C1 INT, C2 BIGINT, C3 FLOAT, C4 DOUBLE, C5 BINARY(10), C6 BOOL,
C7 SMALLINT, C8 TINYINT, C9 NCHAR(10)) TAGS (T1 INT);
'''
)
tdSql.execute(
f''' insert into ctb0 using ac_stb tags (1) values ( 1537146001000 , 1,1,1,1,'bin',1,1,1,'________')
( 1537146002000 , 2,2,2,2,'binar', 1,1,1,'nchar');
'''
)
tdSql.execute(
f''' insert into ntb0 using ac_stb tags (-1) values ( 1537146001000 , 1,1,1,1,'bin',1,1,1,'________')
( 1537146002000 , 2,2,2,2,'binar', 1,1,1,'nchar');
'''
)
tdSql.execute(
f''' insert into ntb0 using ac_stb tags (2) values ( 1537146003000 , 1,1,1,1,'bin',1,1,1,'________')
( 1537146004000 , 2,2,2,2,'binar', 1,1,1,'nchar');
'''
)
tdSql.execute(
f''' insert into ctb6 using ac_stb tags(1) values ( 1537146000000 , 1, 1, 1, 1, 'bin1', 1, 1, 1, '________1')
ctb6 using ac_stb tags(2) values ( 1537146000000 , 2, 2, 2, 2, 'bin2', 2, 2, 2, '________2')
ctb6 using ac_stb tags(3) values ( 1537146000000 , 3, 3, 3, 3, 'bin3', 3, 3, 3, '________3')
'''
)
def check_result(self, dbname="db"):
tdSql.query("select c1,c1,c2,c3,c4,c5,c7,c8,c9 from ac_stb")
tdSql.checkRows(7)
tdSql.query("select t1, count(*), first(c9) from ac_stb partition by t1 order by t1 asc slimit 3")
tdSql.checkRows(2)
# TD-20582
tdSql.query("explain analyze verbose true select count(*) from ac_stb where T1=1")
tdSql.checkRows(16)
# TD-20581
tdSql.execute("insert into ntb0 select * from ntb0")
tdSql.query("select * from ntb0")
tdSql.checkRows(4)
return
# basic query
def run(self): # sourcery skip: extract-duplicate-method, remove-redundant-fstring
tdSql.prepare()
tdLog.printNoPrefix("========== step1: create table ==============")
self.prepare_datas()
tdLog.printNoPrefix("========== step2: check results ==============")
self.check_result()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册