提交 a2085703 编写于 作者: B Benguang Zhao

Merge branch main into FIX/TD-21812-main

...@@ -69,6 +69,9 @@ extern int32_t tsElectInterval; ...@@ -69,6 +69,9 @@ extern int32_t tsElectInterval;
extern int32_t tsHeartbeatInterval; extern int32_t tsHeartbeatInterval;
extern int32_t tsHeartbeatTimeout; extern int32_t tsHeartbeatTimeout;
// vnode
extern int64_t tsVndCommitMaxIntervalMs;
// monitor // monitor
extern bool tsEnableMonitor; extern bool tsEnableMonitor;
extern int32_t tsMonitorInterval; extern int32_t tsMonitorInterval;
......
...@@ -49,10 +49,12 @@ extern "C" { ...@@ -49,10 +49,12 @@ extern "C" {
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500 #define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
#define SYNC_SNAP_RESEND_MS 1000 * 60 #define SYNC_SNAP_RESEND_MS 1000 * 60
#define SYNC_VND_COMMIT_MIN_MS 1000
#define SYNC_MAX_BATCH_SIZE 1 #define SYNC_MAX_BATCH_SIZE 1
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1 #define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID -1 // 0xFFFFFFFFFFFFFFFF #define SYNC_TERM_INVALID -1
typedef enum { typedef enum {
SYNC_STRATEGY_NO_SNAPSHOT = 0, SYNC_STRATEGY_NO_SNAPSHOT = 0,
......
...@@ -60,6 +60,9 @@ int32_t tsElectInterval = 25 * 1000; ...@@ -60,6 +60,9 @@ int32_t tsElectInterval = 25 * 1000;
int32_t tsHeartbeatInterval = 1000; int32_t tsHeartbeatInterval = 1000;
int32_t tsHeartbeatTimeout = 20 * 1000; int32_t tsHeartbeatTimeout = 20 * 1000;
// vnode
int64_t tsVndCommitMaxIntervalMs = 60 * 1000;
// monitor // monitor
bool tsEnableMonitor = true; bool tsEnableMonitor = true;
int32_t tsMonitorInterval = 30; int32_t tsMonitorInterval = 30;
...@@ -435,6 +438,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -435,6 +438,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1; if (cfgAddInt32(pCfg, "syncHeartbeatInterval", tsHeartbeatInterval, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1; if (cfgAddInt32(pCfg, "syncHeartbeatTimeout", tsHeartbeatTimeout, 10, 1000 * 60 * 24 * 2, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1; if (cfgAddString(pCfg, "monitorFqdn", tsMonitorFqdn, 0) != 0) return -1;
...@@ -754,6 +759,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -754,6 +759,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32; tsHeartbeatInterval = cfgGetItem(pCfg, "syncHeartbeatInterval")->i32;
tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32; tsHeartbeatTimeout = cfgGetItem(pCfg, "syncHeartbeatTimeout")->i32;
tsVndCommitMaxIntervalMs = cfgGetItem(pCfg, "vndCommitMaxInterval")->i64;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));
tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath)); tstrncpy(tsUdfdLdLibPath, cfgGetItem(pCfg, "udfdLdLibPath")->str, sizeof(tsUdfdLdLibPath));
......
...@@ -79,6 +79,8 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { ...@@ -79,6 +79,8 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
char path[TSDB_FILENAME_LEN] = {0}; char path[TSDB_FILENAME_LEN] = {0};
vnodeProposeCommitOnNeed(pVnode->pImpl);
taosThreadRwlockWrlock(&pMgmt->lock); taosThreadRwlockWrlock(&pMgmt->lock);
taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t)); taosHashRemove(pMgmt->hash, &pVnode->vgId, sizeof(int32_t));
taosThreadRwlockUnlock(&pMgmt->lock); taosThreadRwlockUnlock(&pMgmt->lock);
......
...@@ -785,9 +785,9 @@ static void mndReloadSyncConfig(SMnode *pMnode) { ...@@ -785,9 +785,9 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg); int32_t code = syncReconfig(pMnode->syncMgmt.sync, &cfg);
if (code != 0) { if (code != 0) {
mError("vgId:1, failed to reconfig mnode sync since %s", terrstr()); mError("vgId:1, mnode sync reconfig failed since %s", terrstr());
} else { } else {
mInfo("vgId:1, reconfig mnode sync success"); mInfo("vgId:1, mnode sync reconfig success");
} }
} }
} }
...@@ -243,7 +243,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ...@@ -243,7 +243,7 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
if (pFile == NULL) { if (pFile == NULL) {
taosMemoryFree(pRaw); taosMemoryFree(pRaw);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
mDebug("failed to read sdb file:%s since %s", file, terrstr()); mInfo("read sdb file:%s finished since %s", file, terrstr());
return 0; return 0;
} }
......
...@@ -124,6 +124,7 @@ FAIL: ...@@ -124,6 +124,7 @@ FAIL:
} }
void sndClose(SSnode *pSnode) { void sndClose(SSnode *pSnode) {
streamMetaCommit(pSnode->pMeta);
streamMetaClose(pSnode->pMeta); streamMetaClose(pSnode->pMeta);
taosMemoryFree(pSnode->path); taosMemoryFree(pSnode->path);
taosMemoryFree(pSnode); taosMemoryFree(pSnode);
......
...@@ -89,6 +89,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); ...@@ -89,6 +89,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs); void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
void vnodeProposeCommitOnNeed(SVnode *pVnode);
// meta // meta
typedef struct SMeta SMeta; // todo: remove typedef struct SMeta SMeta; // todo: remove
......
...@@ -86,6 +86,7 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); ...@@ -86,6 +86,7 @@ int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
// vnodeCommit.c // vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode); int32_t vnodeBegin(SVnode* pVnode);
int32_t vnodeShouldCommit(SVnode* pVnode); int32_t vnodeShouldCommit(SVnode* pVnode);
void vnodeUpdCommitSched(SVnode* pVnode);
void vnodeRollback(SVnode* pVnode); void vnodeRollback(SVnode* pVnode);
int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg); int32_t vnodeSaveInfo(const char* dir, const SVnodeInfo* pCfg);
int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo); int32_t vnodeCommitInfo(const char* dir, const SVnodeInfo* pInfo);
...@@ -103,6 +104,7 @@ void vnodeSyncClose(SVnode* pVnode); ...@@ -103,6 +104,7 @@ void vnodeSyncClose(SVnode* pVnode);
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code); void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg, int32_t code);
bool vnodeIsLeader(SVnode* pVnode); bool vnodeIsLeader(SVnode* pVnode);
bool vnodeIsRoleLeader(SVnode* pVnode); bool vnodeIsRoleLeader(SVnode* pVnode);
int vnodeShouldCommit(SVnode* pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -332,6 +332,11 @@ struct STsdbKeepCfg { ...@@ -332,6 +332,11 @@ struct STsdbKeepCfg {
int32_t keep2; int32_t keep2;
}; };
typedef struct SVCommitSched {
int64_t commitMs;
int64_t maxWaitMs;
} SVCommitSched;
struct SVnode { struct SVnode {
char* path; char* path;
SVnodeCfg config; SVnodeCfg config;
...@@ -350,6 +355,7 @@ struct SVnode { ...@@ -350,6 +355,7 @@ struct SVnode {
STQ* pTq; STQ* pTq;
SSink* pSink; SSink* pSink;
tsem_t canCommit; tsem_t canCommit;
SVCommitSched commitSched;
int64_t sync; int64_t sync;
TdThreadMutex lock; TdThreadMutex lock;
bool blocked; bool blocked;
......
...@@ -203,6 +203,7 @@ _err: ...@@ -203,6 +203,7 @@ _err:
int metaClose(SMeta *pMeta) { int metaClose(SMeta *pMeta) {
if (pMeta) { if (pMeta) {
if (pMeta->pEnv) tdbAbort(pMeta->pEnv, pMeta->txn);
if (pMeta->pCache) metaCacheClose(pMeta); if (pMeta->pCache) metaCacheClose(pMeta);
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
......
...@@ -15,4 +15,11 @@ ...@@ -15,4 +15,11 @@
#include "tq.h" #include "tq.h"
int tqCommit(STQ* pTq) { return tqOffsetCommitFile(pTq->pOffsetStore); } int tqCommit(STQ* pTq) {
if (streamMetaCommit(pTq->pStreamMeta) < 0) {
tqError("vgId:%d, failed to commit stream meta since %s", TD_VID(pTq->pVnode), terrstr());
return -1;
}
return tqOffsetCommitFile(pTq->pOffsetStore);
}
...@@ -1080,6 +1080,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) { ...@@ -1080,6 +1080,8 @@ static int32_t nextRowIterGet(CacheNextRowIter *pIter, TSDBROW **ppRow) {
iMax[nMax] = i; iMax[nMax] = i;
max[nMax++] = pIter->input[i].pRow; max[nMax++] = pIter->input[i].pRow;
} else {
pIter->input[i].next = false;
} }
} }
} }
......
...@@ -2401,6 +2401,19 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock ...@@ -2401,6 +2401,19 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
return code; return code;
} }
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
SSDataBlock* pResBlock = pReader->pResBlock;
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
pResBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
setComposedBlockFlag(pReader, true);
pReader->cost.composedBlocks += 1;
pReader->cost.buildComposedBlockTime += el;
}
static int32_t buildComposedDataBlock(STsdbReader* pReader) { static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -2412,6 +2425,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2412,6 +2425,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
bool asc = ASCENDING_TRAVERSE(pReader->order); bool asc = ASCENDING_TRAVERSE(pReader->order);
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
double el = 0;
STableBlockScanInfo* pBlockScanInfo = NULL; STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
...@@ -2473,10 +2487,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2473,10 +2487,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
} }
bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
// no data in last block and block, no need to proceed. // no data in last block and block, no need to proceed.
if ((hasBlockData == false) && (hasBlockLData == false)) { if (hasBlockData == false) {
break; break;
} }
...@@ -2495,15 +2507,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { ...@@ -2495,15 +2507,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
} }
_end: _end:
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; el = (taosGetTimestampUs() - st) / 1000.0;
pResBlock->info.dataLoad = 1; updateComposedBlockInfo(pReader, el, pBlockScanInfo);
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
setComposedBlockFlag(pReader, true);
double el = (taosGetTimestampUs() - st) / 1000.0;
pReader->cost.composedBlocks += 1;
pReader->cost.buildComposedBlockTime += el;
if (pResBlock->info.rows > 0) { if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64 tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
...@@ -2748,6 +2753,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2748,6 +2753,8 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
return code; return code;
} }
SSDataBlock* pResBlock = pReader->pResBlock;
while (1) { while (1) {
// load the last data block of current table // load the last data block of current table
STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter; STableBlockScanInfo* pScanInfo = *(STableBlockScanInfo**)pStatus->pTableIter;
...@@ -2758,15 +2765,33 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { ...@@ -2758,15 +2765,33 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
if (!hasNexTable) { if (!hasNexTable) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
continue; continue;
} }
code = doBuildDataBlock(pReader); int64_t st = taosGetTimestampUs();
if (code != TSDB_CODE_SUCCESS) { while (1) {
return code; bool hasBlockLData = hasDataInLastBlock(pLastBlockReader);
// no data in last block and block, no need to proceed.
if (hasBlockLData == false) {
break;
} }
if (pReader->pResBlock->info.rows > 0) { buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pLastBlockReader);
if (pResBlock->info.rows >= pReader->capacity) {
break;
}
}
double el = (taosGetTimestampUs() - st) / 1000.0;
updateComposedBlockInfo(pReader, el, pScanInfo);
if (pResBlock->info.rows > 0) {
tsdbDebug("%p uid:%" PRIu64 ", composed data block created, brange:%" PRIu64 "-%" PRIu64
" rows:%d, elapsed time:%.2f ms %s",
pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey,
pResBlock->info.rows, el, pReader->idStr);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -58,7 +58,25 @@ int vnodeBegin(SVnode *pVnode) { ...@@ -58,7 +58,25 @@ int vnodeBegin(SVnode *pVnode) {
return 0; return 0;
} }
void vnodeUpdCommitSched(SVnode *pVnode) {
int64_t randNum = taosRand();
pVnode->commitSched.commitMs = taosGetMonoTimestampMs();
pVnode->commitSched.maxWaitMs = tsVndCommitMaxIntervalMs + (randNum % tsVndCommitMaxIntervalMs);
}
int vnodeShouldCommit(SVnode *pVnode) { int vnodeShouldCommit(SVnode *pVnode) {
if (!pVnode->inUse || !osDataSpaceAvailable()) {
return false;
}
SVCommitSched *pSched = &pVnode->commitSched;
int64_t nowMs = taosGetMonoTimestampMs();
return (((pVnode->inUse->size > pVnode->inUse->node.size) && (pSched->commitMs + SYNC_VND_COMMIT_MIN_MS < nowMs)) ||
(pVnode->inUse->size > 0 && pSched->commitMs + pSched->maxWaitMs < nowMs));
}
int vnodeShouldCommitOld(SVnode *pVnode) {
if (pVnode->inUse) { if (pVnode->inUse) {
return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size); return osDataSpaceAvailable() && (pVnode->inUse->size > pVnode->inUse->node.size);
} }
...@@ -247,6 +265,7 @@ _exit: ...@@ -247,6 +265,7 @@ _exit:
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
return code; return code;
} }
int vnodeAsyncCommit(SVnode *pVnode) { int vnodeAsyncCommit(SVnode *pVnode) {
int32_t code = 0; int32_t code = 0;
...@@ -294,7 +313,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) { ...@@ -294,7 +313,9 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
SVnode *pVnode = pInfo->pVnode; SVnode *pVnode = pInfo->pVnode;
vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode), vInfo("vgId:%d, start to commit, commitId:%" PRId64 " version:%" PRId64 " term: %" PRId64, TD_VID(pVnode),
pInfo->info.state.commitID, pInfo->info.state.committed, pVnode->state.commitTerm); pInfo->info.state.commitID, pInfo->info.state.committed, pInfo->info.state.commitTerm);
vnodeUpdCommitSched(pVnode);
// persist wal before starting // persist wal before starting
if (walPersist(pVnode->pWal) < 0) { if (walPersist(pVnode->pWal) < 0) {
......
...@@ -160,6 +160,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -160,6 +160,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
taosThreadMutexInit(&pVnode->mutex, NULL); taosThreadMutexInit(&pVnode->mutex, NULL);
taosThreadCondInit(&pVnode->poolNotEmpty, NULL); taosThreadCondInit(&pVnode->poolNotEmpty, NULL);
vnodeUpdCommitSched(pVnode);
int8_t rollback = vnodeShouldRollback(pVnode); int8_t rollback = vnodeShouldRollback(pVnode);
// open buffer pool // open buffer pool
...@@ -254,7 +256,7 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); } ...@@ -254,7 +256,7 @@ void vnodePostClose(SVnode *pVnode) { vnodeSyncPostClose(pVnode); }
void vnodeClose(SVnode *pVnode) { void vnodeClose(SVnode *pVnode) {
if (pVnode) { if (pVnode) {
vnodeSyncCommit(pVnode); tsem_wait(&pVnode->canCommit);
vnodeSyncClose(pVnode); vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode); vnodeQueryClose(pVnode);
walClose(pVnode->pWal); walClose(pVnode->pWal);
...@@ -263,6 +265,8 @@ void vnodeClose(SVnode *pVnode) { ...@@ -263,6 +265,8 @@ void vnodeClose(SVnode *pVnode) {
smaClose(pVnode->pSma); smaClose(pVnode->pSma);
metaClose(pVnode->pMeta); metaClose(pVnode->pMeta);
vnodeCloseBufPool(pVnode); vnodeCloseBufPool(pVnode);
tsem_post(&pVnode->canCommit);
// destroy handle // destroy handle
tsem_destroy(&(pVnode->canCommit)); tsem_destroy(&(pVnode->canCommit));
tsem_destroy(&pVnode->syncSem); tsem_destroy(&pVnode->syncSem);
......
...@@ -203,6 +203,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -203,6 +203,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
// skip header // skip header
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
len = pMsg->contLen - sizeof(SMsgHead); len = pMsg->contLen - sizeof(SMsgHead);
bool needCommit = false;
switch (pMsg->msgType) { switch (pMsg->msgType) {
/* META */ /* META */
...@@ -299,9 +300,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -299,9 +300,8 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp); vnodeProcessAlterConfigReq(pVnode, version, pReq, len, pRsp);
break; break;
case TDMT_VND_COMMIT: case TDMT_VND_COMMIT:
vnodeSyncCommit(pVnode); needCommit = true;
vnodeBegin(pVnode); break;
goto _exit;
default: default:
vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType); vError("vgId:%d, unprocessed msg, %d", TD_VID(pVnode), pMsg->msgType);
return -1; return -1;
...@@ -318,7 +318,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp ...@@ -318,7 +318,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
} }
// commit if need // commit if need
if (vnodeShouldCommit(pVnode)) { if (needCommit) {
vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version); vInfo("vgId:%d, commit at version %" PRId64, TD_VID(pVnode), version);
vnodeAsyncCommit(pVnode); vnodeAsyncCommit(pVnode);
......
...@@ -101,6 +101,64 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code) ...@@ -101,6 +101,64 @@ static void vnodeHandleProposeError(SVnode *pVnode, SRpcMsg *pMsg, int32_t code)
} }
} }
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
int64_t seq = 0;
taosThreadMutexLock(&pVnode->lock);
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
if (wait) {
ASSERT(!pVnode->blocked);
pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq;
#if 0
pVnode->blockInfo = pMsg->info;
#endif
}
taosThreadMutexUnlock(&pVnode->lock);
if (code > 0) {
vnodeHandleWriteMsg(pVnode, pMsg);
} else if (code < 0) {
if (terrno != 0) code = terrno;
vnodeHandleProposeError(pVnode, pMsg, code);
}
if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
return code;
}
void vnodeProposeCommitOnNeed(SVnode *pVnode) {
if (!vnodeShouldCommit(pVnode)) {
return;
}
int32_t contLen = sizeof(SMsgHead);
SMsgHead *pHead = rpcMallocCont(contLen);
pHead->contLen = contLen;
pHead->vgId = pVnode->config.vgId;
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_VND_COMMIT;
rpcMsg.contLen = contLen;
rpcMsg.pCont = pHead;
rpcMsg.info.noResp = 1;
bool isWeak = false;
if (vnodeProposeMsg(pVnode, &rpcMsg, isWeak) < 0) {
vTrace("vgId:%d, failed to propose vnode commit since %s", pVnode->config.vgId, terrstr());
goto _out;
}
vInfo("vgId:%d, proposed vnode commit", pVnode->config.vgId);
_out:
vnodeUpdCommitSched(pVnode);
rpcFreeCont(rpcMsg.pCont);
rpcMsg.pCont = NULL;
}
#if BATCH_ENABLE #if BATCH_ENABLE
static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) { static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool *pIsWeakArr, int32_t *arrSize) {
...@@ -178,6 +236,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) ...@@ -178,6 +236,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
continue; continue;
} }
vnodeProposeCommitOnNeed(pVnode);
code = vnodePreProcessWriteMsg(pVnode, pMsg); code = vnodePreProcessWriteMsg(pVnode, pMsg);
if (code != 0) { if (code != 0) {
vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
...@@ -205,34 +265,6 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) ...@@ -205,34 +265,6 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
#else #else
static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak) {
int64_t seq = 0;
taosThreadMutexLock(&pVnode->lock);
int32_t code = syncPropose(pVnode->sync, pMsg, isWeak, &seq);
bool wait = (code == 0 && vnodeIsMsgBlock(pMsg->msgType));
if (wait) {
ASSERT(!pVnode->blocked);
pVnode->blocked = true;
pVnode->blockSec = taosGetTimestampSec();
pVnode->blockSeq = seq;
#if 0
pVnode->blockInfo = pMsg->info;
#endif
}
taosThreadMutexUnlock(&pVnode->lock);
if (code > 0) {
vnodeHandleWriteMsg(pVnode, pMsg);
} else if (code < 0) {
if (terrno != 0) code = terrno;
vnodeHandleProposeError(pVnode, pMsg, code);
}
if (wait) vnodeWaitBlockMsg(pVnode, pMsg);
return code;
}
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnode *pVnode = pInfo->ahandle; SVnode *pVnode = pInfo->ahandle;
int32_t vgId = pVnode->config.vgId; int32_t vgId = pVnode->config.vgId;
...@@ -256,6 +288,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) ...@@ -256,6 +288,8 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
continue; continue;
} }
vnodeProposeCommitOnNeed(pVnode);
code = vnodePreProcessWriteMsg(pVnode, pMsg); code = vnodePreProcessWriteMsg(pVnode, pMsg);
if (code != 0) { if (code != 0) {
vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
......
...@@ -69,8 +69,7 @@ _err: ...@@ -69,8 +69,7 @@ _err:
} }
void streamMetaClose(SStreamMeta* pMeta) { void streamMetaClose(SStreamMeta* pMeta) {
tdbCommit(pMeta->db, pMeta->txn); tdbAbort(pMeta->db, pMeta->txn);
tdbPostCommit(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pCheckpointDb); tdbTbClose(pMeta->pCheckpointDb);
tdbClose(pMeta->db); tdbClose(pMeta->db);
...@@ -88,6 +87,7 @@ void streamMetaClose(SStreamMeta* pMeta) { ...@@ -88,6 +87,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
/*streamMetaReleaseTask(pMeta, pTask);*/ /*streamMetaReleaseTask(pMeta, pTask);*/
} }
taosHashCleanup(pMeta->pTasks); taosHashCleanup(pMeta->pTasks);
taosHashCleanup(pMeta->pRecoverStatus);
taosMemoryFree(pMeta->path); taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta); taosMemoryFree(pMeta);
} }
......
...@@ -247,8 +247,8 @@ typedef struct SyncLocalCmd { ...@@ -247,8 +247,8 @@ typedef struct SyncLocalCmd {
SRaftId destId; SRaftId destId;
int32_t cmd; int32_t cmd;
SyncTerm sdNewTerm; // step down new term SyncTerm currentTerm; // step down new term
SyncIndex fcIndex; // follower commit index SyncIndex commitIndex; // follower commit index
} SyncLocalCmd; } SyncLocalCmd;
int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode); int32_t syncBuildTimeout(SRpcMsg* pMsg, ESyncTimeoutType ttype, uint64_t logicClock, int32_t ms, SSyncNode* pNode);
......
...@@ -98,6 +98,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode); ...@@ -98,6 +98,7 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode);
// access // access
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf); int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf);
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf);
int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry); int32_t syncLogBufferAppend(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry);
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm); int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm);
int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm); int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* pMatchTerm);
......
...@@ -90,6 +90,7 @@ ...@@ -90,6 +90,7 @@
// //
int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) { int32_t syncNodeFollowerCommit(SSyncNode* ths, SyncIndex newCommitIndex) {
ASSERT(false && "deprecated");
if (ths->state != TAOS_SYNC_STATE_FOLLOWER) { if (ths->state != TAOS_SYNC_STATE_FOLLOWER) {
sNTrace(ths, "can not do follower commit"); sNTrace(ths, "can not do follower commit");
return -1; return -1;
...@@ -206,12 +207,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -206,12 +207,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
accepted = true; accepted = true;
_SEND_RESPONSE: _SEND_RESPONSE:
pEntry = NULL;
pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm); pReply->matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, &pReply->lastMatchTerm);
bool matched = (pReply->matchIndex >= pReply->lastSendIndex); bool matched = (pReply->matchIndex >= pReply->lastSendIndex);
if (accepted && matched) { if (accepted && matched) {
pReply->success = true; pReply->success = true;
// update commit index only after matching // update commit index only after matching
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex); (void)syncNodeUpdateCommitIndex(ths, TMIN(pMsg->commitIndex, pReply->lastSendIndex));
} }
// ack, i.e. send response // ack, i.e. send response
......
...@@ -44,6 +44,7 @@ ...@@ -44,6 +44,7 @@
// /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>> // /\ UNCHANGED <<messages, serverVars, candidateVars, leaderVars, log>>
// //
void syncOneReplicaAdvance(SSyncNode* pSyncNode) { void syncOneReplicaAdvance(SSyncNode* pSyncNode) {
ASSERT(false && "deprecated");
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sError("pSyncNode is NULL"); sError("pSyncNode is NULL");
return; return;
......
...@@ -114,7 +114,7 @@ void syncHbTimerDataRemove(int64_t rid) { taosRemoveRef(gHbDataRefId, rid); } ...@@ -114,7 +114,7 @@ void syncHbTimerDataRemove(int64_t rid) { taosRemoveRef(gHbDataRefId, rid); }
SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) { SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid); SSyncHbTimerData *pData = taosAcquireRef(gHbDataRefId, rid);
if (pData == NULL) { if (pData == NULL && rid > 0) {
sInfo("failed to acquire hb-timer-data from refId:%" PRId64, rid); sInfo("failed to acquire hb-timer-data from refId:%" PRId64, rid);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
} }
......
...@@ -1030,6 +1030,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) { ...@@ -1030,6 +1030,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo) {
} }
} }
pSyncNode->commitIndex = commitIndex; pSyncNode->commitIndex = commitIndex;
sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) { if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) {
goto _error; goto _error;
...@@ -1170,9 +1171,10 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { ...@@ -1170,9 +1171,10 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) {
} }
ASSERT(endIndex == lastVer + 1); ASSERT(endIndex == lastVer + 1);
commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex);
sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex);
if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, commitIndex) < 0) { if (syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) {
return -1; return -1;
} }
...@@ -1480,16 +1482,21 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg ...@@ -1480,16 +1482,21 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg
} }
} }
int32_t code = -1;
if (pNode->syncSendMSg != NULL && epSet != NULL) { if (pNode->syncSendMSg != NULL && epSet != NULL) {
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pMsg->info.noResp = 1; pMsg->info.noResp = 1;
return pNode->syncSendMSg(epSet, pMsg); code = pNode->syncSendMSg(epSet, pMsg);
} else { }
sError("vgId:%d, sync send msg by id error, fp:%p epset:%p", pNode->vgId, pNode->syncSendMSg, epSet);
if (code < 0) {
sError("vgId:%d, sync send msg by id error, epset:%p dnode:%d addr:%" PRId64 " err:0x%x", pNode->vgId, epSet,
DID(destRaftId), destRaftId->addr, terrno);
rpcFreeCont(pMsg->pCont); rpcFreeCont(pMsg->pCont);
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1;
} }
return code;
} }
inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) { inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) {
...@@ -2537,8 +2544,9 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2537,8 +2544,9 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT; pSyncMsg->cmd = SYNC_LOCAL_CMD_FOLLOWER_CMT;
pSyncMsg->fcIndex = pMsg->commitIndex; pSyncMsg->commitIndex = pMsg->commitIndex;
SyncIndex fcIndex = pSyncMsg->fcIndex; pSyncMsg->currentTerm = pMsg->term;
SyncIndex fcIndex = pSyncMsg->commitIndex;
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
...@@ -2559,7 +2567,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2559,7 +2567,8 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont; SyncLocalCmd* pSyncMsg = rpcMsgLocalCmd.pCont;
pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN; pSyncMsg->cmd = SYNC_LOCAL_CMD_STEP_DOWN;
pSyncMsg->sdNewTerm = pMsg->term; pSyncMsg->currentTerm = pMsg->term;
pSyncMsg->commitIndex = pMsg->commitIndex;
if (ths->syncEqMsg != NULL && ths->msgcb != NULL) { if (ths->syncEqMsg != NULL && ths->msgcb != NULL) {
int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd); int32_t code = ths->syncEqMsg(ths->msgcb, &rpcMsgLocalCmd);
...@@ -2567,7 +2576,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2567,7 +2576,7 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code); sError("vgId:%d, sync enqueue step-down msg error, code:%d", ths->vgId, code);
rpcFreeCont(rpcMsgLocalCmd.pCont); rpcFreeCont(rpcMsgLocalCmd.pCont);
} else { } else {
sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->sdNewTerm); sTrace("vgId:%d, sync enqueue step-down msg, new-term: %" PRId64, ths->vgId, pSyncMsg->currentTerm);
} }
} }
} }
...@@ -2625,10 +2634,13 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2625,10 +2634,13 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
syncLogRecvLocalCmd(ths, pMsg, ""); syncLogRecvLocalCmd(ths, pMsg, "");
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm); syncNodeStepDown(ths, pMsg->currentTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
(void)syncNodeUpdateCommitIndex(ths, pMsg->fcIndex); SyncTerm matchTerm = syncLogBufferGetLastMatchTerm(ths->pLogBuf);
if (pMsg->currentTerm == matchTerm) {
(void)syncNodeUpdateCommitIndex(ths, pMsg->commitIndex);
}
if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { if (syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {
sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(), sError("vgId:%d, failed to commit raft log since %s. commit index: %" PRId64 "", ths->vgId, terrstr(),
ths->commitIndex); ths->commitIndex);
...@@ -2641,14 +2653,15 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) { ...@@ -2641,14 +2653,15 @@ int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
} }
int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) { int32_t syncNodeOnLocalCmdOld(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
ASSERT(false && "deprecated");
SyncLocalCmd* pMsg = pRpcMsg->pCont; SyncLocalCmd* pMsg = pRpcMsg->pCont;
syncLogRecvLocalCmd(ths, pMsg, ""); syncLogRecvLocalCmd(ths, pMsg, "");
if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) { if (pMsg->cmd == SYNC_LOCAL_CMD_STEP_DOWN) {
syncNodeStepDown(ths, pMsg->sdNewTerm); syncNodeStepDown(ths, pMsg->currentTerm);
} else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) { } else if (pMsg->cmd == SYNC_LOCAL_CMD_FOLLOWER_CMT) {
syncNodeFollowerCommit(ths, pMsg->fcIndex); syncNodeFollowerCommit(ths, pMsg->commitIndex);
} else { } else {
sError("error local cmd"); sError("error local cmd");
......
...@@ -31,6 +31,10 @@ static bool syncIsMsgBlock(tmsg_t type) { ...@@ -31,6 +31,10 @@ static bool syncIsMsgBlock(tmsg_t type) {
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM); (type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
} }
FORCE_INLINE static int64_t syncGetRetryMaxWaitMs() {
return SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF);
}
int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) { int64_t syncLogBufferGetEndIndex(SSyncLogBuffer* pBuf) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
int64_t index = pBuf->endIndex; int64_t index = pBuf->endIndex;
...@@ -264,20 +268,27 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) { ...@@ -264,20 +268,27 @@ int32_t syncLogBufferReInit(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
return ret; return ret;
} }
FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) { FORCE_INLINE SyncTerm syncLogBufferGetLastMatchTermWithoutLock(SSyncLogBuffer* pBuf) {
SyncIndex index = pBuf->matchIndex; SyncIndex index = pBuf->matchIndex;
SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem; SSyncRaftEntry* pEntry = pBuf->entries[(index + pBuf->size) % pBuf->size].pItem;
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
return pEntry->term; return pEntry->term;
} }
SyncTerm syncLogBufferGetLastMatchTerm(SSyncLogBuffer* pBuf) {
taosThreadMutexLock(&pBuf->mutex);
SyncTerm term = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
taosThreadMutexUnlock(&pBuf->mutex);
return term;
}
int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) { int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEntry* pEntry, SyncTerm prevTerm) {
taosThreadMutexLock(&pBuf->mutex); taosThreadMutexLock(&pBuf->mutex);
syncLogBufferValidate(pBuf); syncLogBufferValidate(pBuf);
int32_t ret = -1; int32_t ret = -1;
SyncIndex index = pEntry->index; SyncIndex index = pEntry->index;
SyncIndex prevIndex = pEntry->index - 1; SyncIndex prevIndex = pEntry->index - 1;
SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTerm(pBuf); SyncTerm lastMatchTerm = syncLogBufferGetLastMatchTermWithoutLock(pBuf);
SSyncRaftEntry* pExist = NULL; SSyncRaftEntry* pExist = NULL;
bool inBuf = true; bool inBuf = true;
...@@ -329,6 +340,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt ...@@ -329,6 +340,8 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
} }
// update // update
ASSERT(pBuf->startIndex < index);
ASSERT(index - pBuf->startIndex < pBuf->size);
ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL); ASSERT(pBuf->entries[index % pBuf->size].pItem == NULL);
SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm}; SSyncLogBufEntry tmp = {.pItem = pEntry, .prevLogIndex = prevIndex, .prevLogTerm = prevTerm};
pEntry = NULL; pEntry = NULL;
...@@ -456,6 +469,11 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn ...@@ -456,6 +469,11 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn
pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode); pNode->vgId, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType), applyCode);
} }
if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
sInfo("vgId:%d, fsm execute vnode commit. index: %" PRId64 ", term: %" PRId64 "", pNode->vgId, pEntry->index,
pEntry->term);
}
SRpcMsg rpcMsg = {.code = applyCode}; SRpcMsg rpcMsg = {.code = applyCode};
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
...@@ -552,7 +570,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm ...@@ -552,7 +570,8 @@ int32_t syncLogBufferCommit(SSyncLogBuffer* pBuf, SSyncNode* pNode, int64_t comm
ret = 0; ret = 0;
_out: _out:
// mark as restored if needed // mark as restored if needed
if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex) { if (!pNode->restoreFinish && pBuf->commitIndex >= pNode->commitIndex && pEntry != NULL &&
pNode->pRaftStore->currentTerm <= pEntry->term) {
pNode->pFsm->FpRestoreFinishCb(pNode->pFsm); pNode->pFsm->FpRestoreFinishCb(pNode->pFsm);
pNode->restoreFinish = true; pNode->restoreFinish = true;
sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, sInfo("vgId:%d, restore finished. log buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
...@@ -613,6 +632,12 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -613,6 +632,12 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
} }
if (pMgr->states[pos].acked) { if (pMgr->states[pos].acked) {
if (pMgr->matchIndex < index && pMgr->states[pos].timeMs + (syncGetRetryMaxWaitMs() << 3) < nowMs) {
syncLogReplMgrReset(pMgr);
sWarn("vgId:%d, reset sync log repl mgr since stagnation. index: %" PRId64 ", peer: %" PRIx64, pNode->vgId,
index, pDestId->addr);
goto _out;
}
continue; continue;
} }
...@@ -639,10 +664,12 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -639,10 +664,12 @@ int32_t syncLogReplMgrRetryOnNeed(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
_out: _out:
if (retried) { if (retried) {
pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr); pMgr->retryBackoff = syncLogGetNextRetryBackoff(pMgr);
sInfo("vgId:%d, resent %d sync log entries. dest: %" PRIx64 ", indexes: %" PRId64 " ..., terms: ... %" PRId64 SSyncLogBuffer* pBuf = pNode->pLogBuf;
", retryWaitMs: %" PRId64 ", repl mgr: [%" PRId64 " %" PRId64 ", %" PRId64 ")", sInfo("vgId:%d, resend %d sync log entries. dest: %" PRIx64 ", indexes: %" PRId64 " ..., terms: ... %" PRId64
", retryWaitMs: %" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
" %" PRId64 ", %" PRId64 ")",
pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex, pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
pMgr->endIndex); pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
} }
return ret; return ret;
} }
...@@ -771,7 +798,7 @@ int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { ...@@ -771,7 +798,7 @@ int32_t syncLogReplMgrReplicateOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) { int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex index) {
ASSERT(!pMgr->restored); ASSERT(!pMgr->restored);
ASSERT(pMgr->startIndex >= 0); ASSERT(pMgr->startIndex >= 0);
int64_t retryMaxWaitMs = SYNC_LOG_REPL_RETRY_WAIT_MS * (1 << SYNC_MAX_RETRY_BACKOFF); int64_t retryMaxWaitMs = syncGetRetryMaxWaitMs();
int64_t nowMs = taosGetMonoTimestampMs(); int64_t nowMs = taosGetMonoTimestampMs();
if (pMgr->endIndex > pMgr->startIndex && if (pMgr->endIndex > pMgr->startIndex &&
...@@ -799,10 +826,9 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode ...@@ -799,10 +826,9 @@ int32_t syncLogReplMgrReplicateProbeOnce(SSyncLogReplMgr* pMgr, SSyncNode* pNode
pMgr->endIndex = index + 1; pMgr->endIndex = index + 1;
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sTrace("vgId:%d, attempted to probe the %d'th peer with msg of index:%" PRId64 " term: %" PRId64 sInfo("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term: %" PRId64 ". mgr (rs:%d): [%" PRId64
". pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 "), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
")", pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pNode->vgId, pMgr->peerId, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0; return 0;
} }
...@@ -815,9 +841,11 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p ...@@ -815,9 +841,11 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
int32_t count = 0; int32_t count = 0;
int64_t nowMs = taosGetMonoTimestampMs(); int64_t nowMs = taosGetMonoTimestampMs();
int64_t limit = pMgr->size >> 1; int64_t limit = pMgr->size >> 1;
SyncTerm term = -1;
SyncIndex firstIndex = -1;
for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) { for (SyncIndex index = pMgr->endIndex; index <= pNode->pLogBuf->matchIndex; index++) {
if (batchSize < count++ || limit <= index - pMgr->startIndex) { if (batchSize < count || limit <= index - pMgr->startIndex) {
break; break;
} }
if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) { if (pMgr->startIndex + 1 < index && pMgr->states[(index - 1) % pMgr->size].barrier) {
...@@ -826,7 +854,6 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p ...@@ -826,7 +854,6 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
int64_t pos = index % pMgr->size; int64_t pos = index % pMgr->size;
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false; bool barrier = false;
SyncTerm term = -1;
if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) { if (syncLogBufferReplicateOneTo(pMgr, pNode, index, &term, pDestId, &barrier) < 0) {
sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId, sError("vgId:%d, failed to replicate log entry since %s. index: %" PRId64 ", dest: 0x%016" PRIx64 "", pNode->vgId,
terrstr(), index, pDestId->addr); terrstr(), index, pDestId->addr);
...@@ -837,6 +864,9 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p ...@@ -837,6 +864,9 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
pMgr->states[pos].term = term; pMgr->states[pos].term = term;
pMgr->states[pos].acked = false; pMgr->states[pos].acked = false;
if (firstIndex == -1) firstIndex = index;
count++;
pMgr->endIndex = index + 1; pMgr->endIndex = index + 1;
if (barrier) { if (barrier) {
sInfo("vgId:%d, replicated sync barrier to dest: %" PRIx64 ". index: %" PRId64 ", term: %" PRId64 sInfo("vgId:%d, replicated sync barrier to dest: %" PRIx64 ". index: %" PRId64 ", term: %" PRId64
...@@ -850,10 +880,11 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p ...@@ -850,10 +880,11 @@ int32_t syncLogReplMgrReplicateAttemptedOnce(SSyncLogReplMgr* pMgr, SSyncNode* p
syncLogReplMgrRetryOnNeed(pMgr, pNode); syncLogReplMgrRetryOnNeed(pMgr, pNode);
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sTrace("vgId:%d, attempted to replicate %d msgs to the %d'th peer. pMgr(rs:%d): [%" PRId64 " %" PRId64 ", %" PRId64 sTrace("vgId:%d, replicated %d msgs to peer: %" PRIx64 ". indexes: %" PRId64 "..., terms: ...%" PRId64
"), pBuf: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", ", mgr: (rs:%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
pNode->vgId, count, pMgr->peerId, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, ")",
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex,
pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0; return 0;
} }
...@@ -985,6 +1016,10 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) { ...@@ -985,6 +1016,10 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) { int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex toIndex) {
ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex); ASSERT(pBuf->commitIndex < toIndex && toIndex <= pBuf->endIndex);
if (toIndex == pBuf->endIndex) {
return 0;
}
sInfo("vgId:%d, rollback sync log buffer. toindex: %" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64 sInfo("vgId:%d, rollback sync log buffer. toindex: %" PRId64 ", buffer: [%" PRId64 " %" PRId64 " %" PRId64
", %" PRId64 ")", ", %" PRId64 ")",
pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pNode->vgId, toIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
......
...@@ -219,6 +219,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -219,6 +219,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
ASSERT(pEntry->index == index); ASSERT(pEntry->index == index);
if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
walFsync(pWal, true);
}
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
return 0; return 0;
...@@ -312,29 +316,6 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn ...@@ -312,29 +316,6 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
// need not truncate
SyncIndex wallastVer = walGetLastVer(pWal);
if (fromIndex > wallastVer) {
return 0;
}
// need not truncate
SyncIndex walCommitVer = walGetCommittedVer(pWal);
if (fromIndex <= walCommitVer) {
return 0;
}
// delete from cache
for (SyncIndex index = fromIndex; index <= wallastVer; ++index) {
SLRUCache* pCache = pData->pSyncNode->pLogStore->pCache;
LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
if (h) {
sNTrace(pData->pSyncNode, "cache delete index:%" PRId64, index);
taosLRUCacheRelease(pData->pSyncNode->pLogStore->pCache, h, true);
}
}
int32_t code = walRollback(pWal, fromIndex); int32_t code = walRollback(pWal, fromIndex);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
......
...@@ -17,20 +17,20 @@ ...@@ -17,20 +17,20 @@
#include "syncUtil.h" #include "syncUtil.h"
#include "syncIndexMgr.h" #include "syncIndexMgr.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncPipeline.h"
#include "syncRaftCfg.h" #include "syncRaftCfg.h"
#include "syncRaftStore.h" #include "syncRaftStore.h"
#include "syncSnapshot.h" #include "syncSnapshot.h"
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) { void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
int32_t len = snprintf(buf, bufLen, "{r-num:%d, my:%d, ", pCfg->replicaNum, pCfg->myIndex); int32_t len = snprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex);
for (int32_t i = 0; i < pCfg->replicaNum; ++i) { for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
len += snprintf(buf + len, bufLen - len, "%s:%d", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
if (i < pCfg->replicaNum - 1) { if (i < pCfg->replicaNum - 1) {
len += snprintf(buf + len, bufLen - len, "%s:%d, ", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort); len += snprintf(buf + len, bufLen - len, "%s", ", ");
} else {
len += snprintf(buf + len, bufLen - len, "%s:%d}", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
} }
} }
len += snprintf(buf + len, bufLen - len, "%s", "]}");
} }
void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) { void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
...@@ -89,32 +89,55 @@ bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && ...@@ -89,32 +89,55 @@ bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP &&
// for leader // for leader
static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { static void syncHearbeatReplyTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
int32_t len = 5; int32_t len = 0;
len += snprintf(buf + len, bufLen - len, "%s", "{");
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i])); int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->replicasId[i]));
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
if (i < pSyncNode->replicaNum - 1) { if (i < pSyncNode->replicaNum - 1) {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs); len += snprintf(buf + len, bufLen - len, "%s", ",");
} else {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs);
} }
} }
len += snprintf(buf + len, bufLen - len, "%s", "}");
} }
// for follower // for follower
static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { static void syncHearbeatTime2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
int32_t len = 4; int32_t len = 0;
len += snprintf(buf + len, bufLen - len, "%s", "{");
for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) { for (int32_t i = 0; i < pSyncNode->replicaNum; ++i) {
int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i])); int64_t tsMs = syncIndexMgrGetRecvTime(pSyncNode->pNextIndex, &(pSyncNode->replicasId[i]));
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64, i, tsMs);
if (i < pSyncNode->replicaNum - 1) { if (i < pSyncNode->replicaNum - 1) {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 ",", i, tsMs); len += snprintf(buf + len, bufLen - len, "%s", ",");
} else {
len += snprintf(buf + len, bufLen - len, "%d:%" PRId64 "}", i, tsMs);
} }
} }
len += snprintf(buf + len, bufLen - len, "%s", "}");
}
static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
SSyncLogBuffer* pBuf = pSyncNode->pLogBuf;
if (pBuf == NULL) {
return;
}
int len = 0;
len += snprintf(buf + len, bufLen - len, "[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pBuf->startIndex,
pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
}
static void syncLogReplMgrStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
int len = 0;
len += snprintf(buf + len, bufLen - len, "%s", "{");
for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
if (pMgr == NULL) break;
len += snprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 " %" PRId64 ", %" PRId64 ")", i, pMgr->restored,
pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
if (i + 1 < pSyncNode->replicaNum) {
len += snprintf(buf + len, bufLen - len, "%s", ", ");
}
}
len += snprintf(buf + len, bufLen - len, "%s", "}");
} }
static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) { static void syncPeerState2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
...@@ -156,16 +179,19 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -156,16 +179,19 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
int32_t cacheHit = pNode->pLogStore->cacheHit; int32_t cacheHit = pNode->pLogStore->cacheHit;
int32_t cacheMiss = pNode->pLogStore->cacheMiss; int32_t cacheMiss = pNode->pLogStore->cacheMiss;
char cfgStr[1024]; char cfgStr[1024] = "";
syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
char peerStr[1024] = "{"; char replMgrStatesStr[1024] = "";
syncPeerState2Str(pNode, peerStr, sizeof(peerStr)); syncLogReplMgrStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr));
char hbrTimeStr[256] = "hbr:{"; char bufferStatesStr[256] = "";
syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));
char hbrTimeStr[256] = "";
syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr)); syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr));
char hbTimeStr[256] = "hb:{"; char hbTimeStr[256] = "";
syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr)); syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr));
char eventLog[512]; // {0}; char eventLog[512]; // {0};
...@@ -186,16 +212,16 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo ...@@ -186,16 +212,16 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, SSyncNo
taosPrintLog(flags, level, dflag, taosPrintLog(flags, level, dflag,
"vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64 "vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", first-ver:%" PRId64
", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64 ", last-ver:%" PRId64 ", min:%" PRId64 ", snap:%" PRId64 ", snap-term:%" PRIu64
", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hit:%d, mis:%d, hb-slow:%d, hbr-slow:%d, " ", elect-times:%d, as-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64 "aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64 ", %s, %s, %s, %s", ", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
", buffer:%s, repl-mgrs:%s, members:%s, hb:%s, hb-reply:%s",
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex, pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, logBeginIndex,
logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->electNum,
pNode->becomeLeaderNum, pNode->configChangeNum, cacheHit, cacheMiss, pNode->hbSlowNum, pNode->becomeLeaderNum, pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems,
pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex, pNode->replicaNum, pNode->snapshottingIndex, pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing,
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser,
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr, hbTimeStr, bufferStatesStr, replMgrStatesStr, cfgStr, hbTimeStr, hbrTimeStr);
hbrTimeStr);
} }
} }
...@@ -216,7 +242,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla ...@@ -216,7 +242,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
} }
char cfgStr[1024]; char cfgStr[1024] = "";
syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
char peerStr[1024] = "{"; char peerStr[1024] = "{";
...@@ -262,7 +288,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df ...@@ -262,7 +288,7 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); logBeginIndex = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore);
} }
char cfgStr[1024]; char cfgStr[1024] = "";
syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr)); syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
char peerStr[1024] = "{"; char peerStr[1024] = "{";
...@@ -304,7 +330,7 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* ...@@ -304,7 +330,7 @@ void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char*
void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) { void syncLogRecvLocalCmd(SSyncNode* pSyncNode, const SyncLocalCmd* pMsg, const char* s) {
sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd, sNTrace(pSyncNode, "recv sync-local-cmd {cmd:%d-%s, sd-new-term:%" PRId64 ", fc-index:%" PRId64 "}, %s", pMsg->cmd,
syncLocalCmdGetStr(pMsg->cmd), pMsg->sdNewTerm, pMsg->fcIndex, s); syncLocalCmdGetStr(pMsg->cmd), pMsg->currentTerm, pMsg->commitIndex, s);
} }
void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) { void syncLogSendAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntriesReply* pMsg, const char* s) {
......
...@@ -74,7 +74,12 @@ int32_t tdbTbcUpsert(TBC *pTbc, const void *pKey, int nKey, const void *pData, i ...@@ -74,7 +74,12 @@ int32_t tdbTbcUpsert(TBC *pTbc, const void *pKey, int nKey, const void *pData, i
int32_t tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *), int32_t tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void (*xFree)(void *, void *),
void *xArg, int flags); void *xArg, int flags);
int32_t tdbTxnClose(TXN *pTxn); int32_t tdbTxnCloseImpl(TXN *pTxn);
#define tdbTxnClose(pTxn) \
do { \
tdbTxnCloseImpl(pTxn); \
(pTxn) = NULL; \
} while (0)
// other // other
void tdbFree(void *); void tdbFree(void *);
......
...@@ -28,13 +28,18 @@ int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void ...@@ -28,13 +28,18 @@ int tdbTxnOpen(TXN *pTxn, int64_t txnid, void *(*xMalloc)(void *, size_t), void
return 0; return 0;
} }
int tdbTxnClose(TXN *pTxn) { int tdbTxnCloseImpl(TXN *pTxn) {
if (pTxn) { if (pTxn) {
if (pTxn->jPageSet) { if (pTxn->jPageSet) {
hashset_destroy(pTxn->jPageSet); hashset_destroy(pTxn->jPageSet);
pTxn->jPageSet = NULL; pTxn->jPageSet = NULL;
} }
if (pTxn->jfd) {
tdbOsClose(pTxn->jfd);
ASSERT(pTxn->jfd == NULL);
}
tdbOsFree(pTxn); tdbOsFree(pTxn);
} }
......
...@@ -1126,7 +1126,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { ...@@ -1126,7 +1126,7 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) {
int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb); int ret = uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, cliConnCb);
if (ret != 0) { if (ret != 0) {
tGTrace("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port, tGError("%s conn %p failed to connect to %s:%d, reason:%s", pTransInst->label, conn, conn->ip, conn->port,
uv_err_name(ret)); uv_err_name(ret));
uv_timer_stop(conn->timer); uv_timer_stop(conn->timer);
......
...@@ -292,19 +292,10 @@ void walAlignVersions(SWal* pWal) { ...@@ -292,19 +292,10 @@ void walAlignVersions(SWal* pWal) {
} }
pWal->vers.lastVer = pWal->vers.snapshotVer; pWal->vers.lastVer = pWal->vers.snapshotVer;
} }
if (pWal->vers.commitVer < pWal->vers.snapshotVer) { // reset commitVer and appliedVer
wWarn("vgId:%d, commitVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId,
pWal->vers.commitVer, pWal->vers.snapshotVer);
pWal->vers.commitVer = pWal->vers.snapshotVer; pWal->vers.commitVer = pWal->vers.snapshotVer;
}
if (pWal->vers.appliedVer < pWal->vers.snapshotVer) {
wWarn("vgId:%d, appliedVer:%" PRId64 " is less than snapshotVer:%" PRId64 ". align with it.", pWal->cfg.vgId,
pWal->vers.appliedVer, pWal->vers.snapshotVer);
pWal->vers.appliedVer = pWal->vers.snapshotVer; pWal->vers.appliedVer = pWal->vers.snapshotVer;
} wInfo("vgId:%d, reset commitVer to %" PRId64, pWal->cfg.vgId, pWal->vers.commitVer);
pWal->vers.commitVer = TMIN(pWal->vers.lastVer, pWal->vers.commitVer);
pWal->vers.appliedVer = TMIN(pWal->vers.commitVer, pWal->vers.appliedVer);
} }
bool walLogEntriesComplete(const SWal* pWal) { bool walLogEntriesComplete(const SWal* pWal) {
......
...@@ -105,7 +105,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { ...@@ -105,7 +105,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver); wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver);
int64_t code; int64_t code;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) {
terrno = TSDB_CODE_WAL_INVALID_VER; terrno = TSDB_CODE_WAL_INVALID_VER;
taosThreadMutexUnlock(&pWal->mutex); taosThreadMutexUnlock(&pWal->mutex);
return -1; return -1;
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
$dbPrefix = reg_db
$tb = tb
$rowNum = 8200
$ts0 = 1537146000000
$delta = 100
print ========== reg.sim
$i = 0
$db = $dbPrefix . $i
sql drop database if exists $db -x step1
step1:
sql create database $db vgroups 1;
sql use $db
sql create table $tb (ts timestamp, c1 int)
$i = 0
$ts = $ts0
$x = 0
while $x < $rowNum
$xs = $x * $delta
$ts = $ts0 + $xs
sql insert into $tb values ( $ts , $x )
$x = $x + 1
endw
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
print ================== server restart completed
sql connect
sql use $db
sql delete from $tb where ts=1537146000000
sql delete from $tb where ts=1537146409500
print =========================> TS-2410
sql select * from $tb limit 20 offset 4090
print $data00
print $data10
print $data20
print $data30
print $data40
print $data50
print $data60
print $data70
print $data80
print $data90
if $data40 != @18-09-17 09:06:49.600@ then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
...@@ -82,8 +82,8 @@ endi ...@@ -82,8 +82,8 @@ endi
#=================================================================== #===================================================================
#==================== reboot to trigger commit data to file #==================== reboot to trigger commit data to file
sql flush database d0;
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
......
...@@ -85,6 +85,7 @@ endi ...@@ -85,6 +85,7 @@ endi
#==================== reboot to trigger commit data to file #==================== reboot to trigger commit data to file
sql flush database d0;
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册