提交 6f80993d 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact

...@@ -250,7 +250,7 @@ The [Taos] structure is the connection manager in [libtaos] and provides two mai ...@@ -250,7 +250,7 @@ The [Taos] structure is the connection manager in [libtaos] and provides two mai
Column information is stored using [ColumnMeta]. Column information is stored using [ColumnMeta].
``rust ```rust
let cols = &q.column_meta; let cols = &q.column_meta;
for col in cols { for col in cols {
println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes); println!("name: {}, type: {:?} , bytes: {}", col.name, col.type_, col.bytes);
......
...@@ -148,6 +148,10 @@ static int32_t mmStart(SMnodeMgmt *pMgmt) { ...@@ -148,6 +148,10 @@ static int32_t mmStart(SMnodeMgmt *pMgmt) {
static void mmStop(SMnodeMgmt *pMgmt) { static void mmStop(SMnodeMgmt *pMgmt) {
dDebug("mnode-mgmt start to stop"); dDebug("mnode-mgmt start to stop");
taosThreadRwlockWrlock(&pMgmt->lock);
pMgmt->stopped = 1;
taosThreadRwlockUnlock(&pMgmt->lock);
mndStop(pMgmt->pMnode); mndStop(pMgmt->pMnode);
} }
......
...@@ -220,9 +220,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) { ...@@ -220,9 +220,6 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
} }
void mmStopWorker(SMnodeMgmt *pMgmt) { void mmStopWorker(SMnodeMgmt *pMgmt) {
taosThreadRwlockWrlock(&pMgmt->lock);
pMgmt->stopped = 1;
taosThreadRwlockUnlock(&pMgmt->lock);
while (pMgmt->refCount > 0) taosMsleep(10); while (pMgmt->refCount > 0) taosMsleep(10);
tSingleWorkerCleanup(&pMgmt->monitorWorker); tSingleWorkerCleanup(&pMgmt->monitorWorker);
......
...@@ -228,7 +228,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -228,7 +228,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED; terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED;
code = terrno; code = terrno;
goto _OVER; return 0;
} }
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId);
......
...@@ -505,6 +505,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) { ...@@ -505,6 +505,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
} }
static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
if (!IsReq(pMsg)) return 0;
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0; if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER &&
......
...@@ -742,7 +742,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) { ...@@ -742,7 +742,7 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
pMgmt->errCode = 0; pMgmt->errCode = 0;
pMgmt->transId = -1; pMgmt->transId = -1;
tsem_wait(&pMgmt->syncSem); tsem_wait(&pMgmt->syncSem);
mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode)); mInfo("alter mnode sync result:0x%x %s", pMgmt->errCode, tstrerror(pMgmt->errCode));
terrno = pMgmt->errCode; terrno = pMgmt->errCode;
return pMgmt->errCode; return pMgmt->errCode;
} }
......
...@@ -59,6 +59,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM ...@@ -59,6 +59,7 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode)); mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
} }
pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} else { } else {
STrans *pTrans = mndAcquireTrans(pMnode, transId); STrans *pTrans = mndAcquireTrans(pMnode, transId);
...@@ -122,6 +123,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM ...@@ -122,6 +123,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode)); mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
} }
pMgmt->transId = 0;
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
} }
} }
...@@ -258,13 +260,17 @@ void mndSyncStart(SMnode *pMnode) { ...@@ -258,13 +260,17 @@ void mndSyncStart(SMnode *pMnode) {
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
} }
void mndSyncStop(SMnode *pMnode) {} void mndSyncStop(SMnode *pMnode) {
if (pMnode->syncMgmt.transId != 0) {
pMnode->syncMgmt.transId = 0;
tsem_post(&pMnode->syncMgmt.syncSem);
}
}
bool mndIsMaster(SMnode *pMnode) { bool mndIsMaster(SMnode *pMnode) {
SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSyncMgmt *pMgmt = &pMnode->syncMgmt;
ESyncState state = syncGetMyRole(pMgmt->sync); if (!syncIsReady(pMgmt->sync)) {
if (state != TAOS_SYNC_STATE_LEADER) {
terrno = TSDB_CODE_SYN_NOT_LEADER; terrno = TSDB_CODE_SYN_NOT_LEADER;
return false; return false;
} }
......
...@@ -466,10 +466,10 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe ...@@ -466,10 +466,10 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
} }
// process request // process request
// if (metaDropSTable(pVnode->pMeta, version, &req) < 0) { if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
// rcode = terrno; rcode = terrno;
// goto _exit; goto _exit;
// } }
// return rsp // return rsp
_exit: _exit:
......
...@@ -3507,8 +3507,7 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) ...@@ -3507,8 +3507,7 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
// check for the limitation in each group // check for the limitation in each group
if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) { if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pRes->info.rows >= pProjectInfo->limit.limit) {
pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput); pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput);
if (pProjectInfo->slimit.limit > 0 && pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
if (pProjectInfo->slimit.limit == -1 || pProjectInfo->slimit.limit <= pProjectInfo->curGroupOutput) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
} }
......
...@@ -331,7 +331,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { ...@@ -331,7 +331,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
} }
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
closeAllResultRows(&pInfo->binfo.resultRowInfo);
#if 0 #if 0
if(pOperator->fpSet.encodeResultRow){ if(pOperator->fpSet.encodeResultRow){
......
...@@ -2309,14 +2309,14 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN ...@@ -2309,14 +2309,14 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order); pInfo->pSortInfo = generateSortByTsInfo(pInfo->cond.order);
pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false); pInfo->pSortInputBlock = createOneDataBlock(pInfo->pResBlock, false);
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = rowSize < 1024 ? 1024 : rowSize * 2; int32_t blockMetaSize = (int32_t)blockDataGetSerialMetaSize(pInfo->pResBlock->info.numOfCols);
pInfo->bufPageSize = (rowSize * 2 + blockMetaSize) < 1024 ? 1024 : (rowSize * 2 + blockMetaSize);
pInfo->sortBufSize = pInfo->bufPageSize * 16; pInfo->sortBufSize = pInfo->bufPageSize * 16;
pInfo->hasGroupId = false; pInfo->hasGroupId = false;
pInfo->prefetchedTuple = NULL; pInfo->prefetchedTuple = NULL;
pOperator->name = "TableMergeScanOperator"; pOperator->name = "TableMergeScanOperator";
// TODO : change it pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blocking = false; pOperator->blocking = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
pOperator->info = pInfo; pOperator->info = pInfo;
......
...@@ -1384,8 +1384,9 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { ...@@ -1384,8 +1384,9 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, static void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex);
int32_t rowIndex);
static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rIndex);
int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
...@@ -1407,11 +1408,23 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -1407,11 +1408,23 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
if (pEntryInfo->numOfRes > 0) { if (pEntryInfo->numOfRes > 0) {
setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow); setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow);
} else {
setNullSelectivityValue(pCtx, pBlock, currentRow);
} }
return pEntryInfo->numOfRes; return pEntryInfo->numOfRes;
} }
void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) {
for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) {
SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j];
int32_t dstSlotId = pc->pExpr->base.resSchema.slotId;
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
colDataAppendNULL(pDstCol, rowIndex);
}
}
void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) { void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos* pTuplePos, int32_t rowIndex) {
int32_t pageId = pTuplePos->pageId; int32_t pageId = pTuplePos->pageId;
int32_t offset = pTuplePos->offset; int32_t offset = pTuplePos->offset;
...@@ -4627,8 +4640,6 @@ int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { ...@@ -4627,8 +4640,6 @@ int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STailItem* pItem = pInfo->pItems[i]; STailItem* pItem = pInfo->pItems[i];
colDataAppend(pCol, currentRow, pItem->data, false); colDataAppend(pCol, currentRow, pItem->data, false);
// setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow);
currentRow += 1; currentRow += 1;
} }
......
...@@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { ...@@ -713,8 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries // delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0); ASSERT(code == 0);
sDebug("vgId:%d sync event currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, ths->pRaftStore->currentTerm, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId,
delBegin, delEnd); syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd);
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
return code; return code;
...@@ -880,6 +880,72 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -880,6 +880,72 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
} }
} while (0); } while (0);
// fake match2
//
// condition1:
// preIndex <= my commit index
//
// operation:
// if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
// match my-commit-index or my-commit-index + 1
// no operation on log
do {
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex);
if (condition) {
sTrace("recv SyncAppendEntries, fake match2, msg-prevLogIndex:%ld, my-commitIndex:%ld", pMsg->prevLogIndex,
ths->commitIndex);
SyncIndex matchIndex = ths->commitIndex;
bool hasAppendEntries = pMsg->dataLen > 0;
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
// append entry
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ASSERT(pAppendEntry != NULL);
{
// has extra entries (> preIndex) in local log
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
bool hasExtraEntries = logLastIndex > pMsg->prevLogIndex;
if (hasExtraEntries) {
// make log same, rollback deleted entries
code = syncNodeMakeLogSame(ths, pMsg);
ASSERT(code == 0);
}
}
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0);
// pre commit
code = syncNodePreCommit(ths, pAppendEntry);
ASSERT(code == 0);
matchIndex = pMsg->prevLogIndex + 1;
syncEntryDestory(pAppendEntry);
}
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true;
pReply->matchIndex = matchIndex;
// send response
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
return ret;
}
} while (0);
// calculate logOK here, before will coredump, due to fake match // calculate logOK here, before will coredump, due to fake match
bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg); bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
...@@ -995,8 +1061,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ...@@ -995,8 +1061,10 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;
ths->commitIndex = snapshot.lastApplyIndex; ths->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", ths->vgId, sDebug(
ths->pRaftStore->currentTerm, commitBegin, commitEnd, syncUtilState2String(ths->state)); "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
commitBegin, commitEnd);
} }
SyncIndex beginIndex = ths->commitIndex + 1; SyncIndex beginIndex = ths->commitIndex + 1;
......
...@@ -190,19 +190,23 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries ...@@ -190,19 +190,23 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if (gRaftDetailLog) { if (gRaftDetailLog) {
char* s = snapshotSender2Str(pSender); char* s = snapshotSender2Str(pSender);
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu "
"sender:%s", "sender:%s",
ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm, s); pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, s);
taosMemoryFree(s); taosMemoryFree(s);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu",
ths->vgId, ths->pRaftStore->currentTerm, host, port, pSender->snapshot.lastApplyIndex, ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->privateTerm); pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm);
} }
} }
......
...@@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { ...@@ -56,9 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex commitEnd = snapshot.lastApplyIndex; SyncIndex commitEnd = snapshot.lastApplyIndex;
pSyncNode->commitIndex = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex;
sDebug("vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s", pSyncNode->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld",
pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
syncUtilState2String(pSyncNode->state)); pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex);
} }
// update commit index // update commit index
......
...@@ -575,8 +575,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -575,8 +575,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return -1; return -1;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
TMSG_INFO(pMsg->msgType), pMsg->msgType);
ret = syncNodePropose(pSyncNode, pMsg, isWeak); ret = syncNodePropose(pSyncNode, pMsg, isWeak);
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
...@@ -585,8 +586,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -585,8 +586,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0; int32_t ret = 0;
sDebug("vgId:%d sync event currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm,
TMSG_INFO(pMsg->msgType), pMsg->msgType);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
SRespStub stub; SRespStub stub;
...@@ -832,7 +834,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { ...@@ -832,7 +834,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot meta // snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1; // pSyncNode->sMeta.lastConfigIndex = -1;
sDebug("vgId:%d sync event currentTerm:%lu sync open", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm); sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
return pSyncNode; return pSyncNode;
} }
...@@ -879,7 +882,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { ...@@ -879,7 +882,8 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
} }
void syncNodeClose(SSyncNode* pSyncNode) { void syncNodeClose(SSyncNode* pSyncNode) {
sDebug("vgId:%d sync event currentTerm:%lu sync close", pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm); sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId,
syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm);
int32_t ret; int32_t ret;
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
...@@ -1318,7 +1322,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1318,7 +1322,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
oldSenders[i] = (pSyncNode->senders)[i]; oldSenders[i] = (pSyncNode->senders)[i];
sDebug("vgId:%d sync event currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm); pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm);
if (gRaftDetailLog) { if (gRaftDetailLog) {
; ;
...@@ -1371,9 +1376,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1371,9 +1376,12 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
char host[128]; char host[128];
uint16_t port; uint16_t port;
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu", sDebug(
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, "
oldSenders[j], oldSenders[j]->privateTerm); "privateTerm:%lu",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j],
oldSenders[j]->privateTerm);
(pSyncNode->senders)[i] = oldSenders[j]; (pSyncNode->senders)[i] = oldSenders[j];
oldSenders[j] = NULL; oldSenders[j] = NULL;
reset = true; reset = true;
...@@ -1381,9 +1389,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1381,9 +1389,11 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
// reset replicaIndex // reset replicaIndex
int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex;
(pSyncNode->senders)[i]->replicaIndex = i; (pSyncNode->senders)[i]->replicaIndex = i;
sDebug("vgId:%d sync event currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d", sDebug(
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, "
(pSyncNode->senders)[i], reset); "reset:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset);
} }
} }
} }
...@@ -1392,9 +1402,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1392,9 +1402,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if ((pSyncNode->senders)[i] == NULL) { if ((pSyncNode->senders)[i] == NULL) {
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
sDebug("vgId:%d sync event currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", sDebug(
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu",
(pSyncNode->senders)[i]->privateTerm); pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm);
} }
} }
...@@ -1402,7 +1413,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex ...@@ -1402,7 +1413,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
if (oldSenders[i] != NULL) { if (oldSenders[i] != NULL) {
snapshotSenderDestroy(oldSenders[i]); snapshotSenderDestroy(oldSenders[i]);
sDebug("vgId:%d sync event currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); pSyncNode->pRaftStore->currentTerm, oldSenders[i], i);
oldSenders[i] = NULL; oldSenders[i] = NULL;
} }
...@@ -1474,9 +1486,10 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { ...@@ -1474,9 +1486,10 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) {
void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s", "restoreFinish:%d, %s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr); pSyncNode->restoreFinish, debugStr);
// maybe clear leader cache // maybe clear leader cache
...@@ -1514,9 +1527,12 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { ...@@ -1514,9 +1527,12 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
// reset restoreFinish // reset restoreFinish
pSyncNode->restoreFinish = false; pSyncNode->restoreFinish = false;
sDebug("vgId:%d sync event currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s", sDebug(
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, "
pSyncNode->restoreFinish, debugStr); "restoreFinish:%d, %s",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum,
pSyncNode->restoreFinish, debugStr);
// state change // state change
pSyncNode->state = TAOS_SYNC_STATE_LEADER; pSyncNode->state = TAOS_SYNC_STATE_LEADER;
...@@ -2090,13 +2106,15 @@ const char* syncStr(ESyncState state) { ...@@ -2090,13 +2106,15 @@ const char* syncStr(ESyncState state) {
static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) {
SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg);
sDebug("vgId:%d sync event currentTerm:%lu begin leader transfer", ths->vgId, ths->pRaftStore->currentTerm); sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId,
syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm);
if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) {
sDebug("vgId:%d sync event currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId,
ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort,
pSyncLeaderTransfer->newLeaderId.addr);
// reset elect timer now! // reset elect timer now!
int32_t electMS = 1; int32_t electMS = 1;
...@@ -2174,8 +2192,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2174,8 +2192,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
// change isStandBy to normal // change isStandBy to normal
if (!isDrop) { if (!isDrop) {
char tmpbuf[128]; char tmpbuf[512];
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum); char* oldStr = syncCfg2Str(&oldSyncCfg);
char* newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change from %d to %d, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(ths, tmpbuf); syncNodeBecomeLeader(ths, tmpbuf);
} else { } else {
...@@ -2183,8 +2209,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2183,8 +2209,16 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
} }
} }
} else { } else {
char tmpbuf[128]; char tmpbuf[512];
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d", oldSyncCfg.replicaNum, newSyncCfg.replicaNum); char* oldStr = syncCfg2Str(&oldSyncCfg);
char* newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change2 from %d to %d, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
syncNodeBecomeFollower(ths, tmpbuf); syncNodeBecomeFollower(ths, tmpbuf);
} }
...@@ -2218,8 +2252,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE ...@@ -2218,8 +2252,10 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0; int32_t code = 0;
ESyncState state = flag; ESyncState state = flag;
sDebug("vgId:%d sync event currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64
ths->pRaftStore->currentTerm, beginIndex, endIndex, syncUtilState2String(state)); ", %s",
ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex,
endIndex, syncUtilState2String(state));
// execute fsm // execute fsm
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
...@@ -2267,8 +2303,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ...@@ -2267,8 +2303,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths->pFsm->FpRestoreFinishCb(ths->pFsm); ths->pFsm->FpRestoreFinishCb(ths->pFsm);
} }
ths->restoreFinish = true; ths->restoreFinish = true;
sDebug("vgId:%d sync event currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId,
ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), pEntry->index); syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm,
syncUtilState2String(ths->state), pEntry->index);
} }
} }
......
...@@ -163,10 +163,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ...@@ -163,10 +163,12 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync(pWal, true); walFsync(pWal, true);
sDebug("vgId:%d sync event currentTerm:%lu write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", sDebug(
pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, "
syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, "originalRpcType:%s,%d",
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
return code; return code;
} }
...@@ -323,10 +325,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ...@@ -323,10 +325,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync(pWal, true); walFsync(pWal, true);
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu old write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d", "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, "
pData->pSyncNode->vgId, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, "originalRpcType:%s,%d",
syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->pRaftCfg->isStandBy, TMSG_INFO(pEntry->msgType), pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex,
pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy,
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
return code; return code;
} }
...@@ -407,18 +410,20 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { ...@@ -407,18 +410,20 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
} }
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SSyncLogStoreData* pData = pLogStore->data;
// assert(walCommit(pWal, index) == 0); SWal* pWal = pData->pWal;
int32_t code = walCommit(pWal, index); // assert(walCommit(pWal, index) == 0);
if (code != 0) { int32_t code = walCommit(pWal, index);
int32_t err = terrno; if (code != 0) {
const char* errStr = tstrerror(err); int32_t err = terrno;
int32_t linuxErr = errno; const char* errStr = tstrerror(err);
const char* linuxErrMsg = strerror(errno); int32_t linuxErr = errno;
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg); const char* linuxErrMsg = strerror(errno);
ASSERT(0); sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
} linuxErrMsg); ASSERT(0);
}
return 0; return 0;
} }
......
...@@ -46,9 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { ...@@ -46,9 +46,11 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub)); taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, sDebug(
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p",
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
return keyCode; return keyCode;
...@@ -71,9 +73,12 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { ...@@ -71,9 +73,12 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
memcpy(pStub, pTmp, sizeof(SRespStub)); memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p", sDebug(
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p "
index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); "ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
return 1; // get one object return 1; // get one object
...@@ -90,9 +95,12 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu ...@@ -90,9 +95,12 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
memcpy(pStub, pTmp, sizeof(SRespStub)); memcpy(pStub, pTmp, sizeof(SRespStub));
SSyncNode *pSyncNode = pObj->data; SSyncNode *pSyncNode = pObj->data;
sDebug("vgId:%d sync event currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p", sDebug(
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p "
index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); "ahandle:%p",
pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index,
pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
taosHashRemove(pObj->pRespHash, &index, sizeof(index)); taosHashRemove(pObj->pRespHash, &index, sizeof(index));
taosThreadMutexUnlock(&(pObj->mutex)); taosThreadMutexUnlock(&(pObj->mutex));
......
...@@ -141,20 +141,24 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { ...@@ -141,20 +141,24 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send " "lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s", "msg:%s",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, msgStr); pSender->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm); pSender->privateTerm);
} }
...@@ -283,29 +287,35 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { ...@@ -283,29 +287,35 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send " "lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s", "msg:%s",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm, msgStr); pSender->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm); pSender->privateTerm);
} }
} else { } else {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d "
"lastApplyIndex:%ld "
"lastApplyTerm:%lu " "lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack,
pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex,
pSender->privateTerm); pSender->privateTerm);
} }
...@@ -339,14 +349,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { ...@@ -339,14 +349,19 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s", "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, "privateTerm:%lu send "
pSender->privateTerm, msgStr); "msg:%s",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm,
msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug("vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu", sDebug(
pSender->pSyncNode->vgId, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d "
pSender->ack, pSender->privateTerm); "privateTerm:%lu",
pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex,
pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm);
} }
syncSnapshotSendDestroy(pMsg); syncSnapshotSendDestroy(pMsg);
...@@ -579,17 +594,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -579,17 +594,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld privateTerm:%lu", "lastConfigIndex:%ld privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
...@@ -602,7 +623,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -602,7 +623,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
// maybe update lastconfig // maybe update lastconfig
if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) { if (pMsg->lastConfigIndex >= SYNC_INDEX_BEGIN) {
int32_t oldReplicaNum = pSyncNode->replicaNum; // int32_t oldReplicaNum = pSyncNode->replicaNum;
SSyncCfg oldSyncCfg = pSyncNode->pRaftCfg->cfg;
// update new config myIndex // update new config myIndex
SSyncCfg newSyncCfg = pMsg->lastConfig; SSyncCfg newSyncCfg = pMsg->lastConfig;
...@@ -626,24 +648,34 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -626,24 +648,34 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
bool isDrop; bool isDrop;
if (IamInNew) { if (IamInNew) {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld ", "lastConfigIndex:%ld ",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastConfigIndex); pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu do not update config by snapshot, I am not in newCfg, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in "
"newCfg, "
"lastIndex:%ld, lastTerm:%lu, " "lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld ", "lastConfigIndex:%ld ",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastConfigIndex); pSyncNode->pRaftStore->currentTerm, pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex);
} }
// change isStandBy to normal // change isStandBy to normal
if (!isDrop) { if (!isDrop) {
char tmpbuf[128]; char tmpbuf[512];
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d", oldReplicaNum, newSyncCfg.replicaNum); char *oldStr = syncCfg2Str(&oldSyncCfg);
char *newStr = syncCfg2Str(&newSyncCfg);
syncUtilJson2Line(oldStr);
syncUtilJson2Line(newStr);
snprintf(tmpbuf, sizeof(tmpbuf), "config change3 from %d to %d, %s --> %s", oldSyncCfg.replicaNum,
newSyncCfg.replicaNum, oldStr, newStr);
taosMemoryFree(oldStr);
taosMemoryFree(newStr);
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
syncNodeBecomeLeader(pSyncNode, tmpbuf); syncNodeBecomeLeader(pSyncNode, tmpbuf);
} else { } else {
...@@ -662,20 +694,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -662,20 +694,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex,
logSimpleStr); snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm, logSimpleStr);
taosMemoryFree(logSimpleStr); taosMemoryFree(logSimpleStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d finish, update log begin index:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin "
"index:%ld, "
"snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
snapshot.lastApplyIndex, snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm); pSyncNode->pRaftStore->currentTerm, host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, snapshot.lastConfigIndex, pReceiver->privateTerm);
} }
pReceiver->pWriter = NULL; pReceiver->pWriter = NULL;
...@@ -686,17 +721,21 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -686,17 +721,21 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu", "lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) { } else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
...@@ -711,20 +750,24 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -711,20 +750,24 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv " "lastConfigIndex:%ld, privateTerm:%lu, recv "
"msg:%s", "msg:%s",
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu", "lastConfigIndex:%ld, privateTerm:%lu",
pReceiver->pSyncNode->vgId, pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastIndex, pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); pReceiver->pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) { } else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
...@@ -744,19 +787,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { ...@@ -744,19 +787,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if (gRaftDetailLog) { if (gRaftDetailLog) {
char *msgStr = syncSnapshotSend2Str(pMsg); char *msgStr = syncSnapshotSend2Str(pMsg);
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr); pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm, msgStr);
taosMemoryFree(msgStr); taosMemoryFree(msgStr);
} else { } else {
sDebug( sDebug(
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, " "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, "
"lastIndex:%ld, "
"lastTerm:%lu, " "lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu", "lastConfigIndex:%ld, privateTerm:%lu",
pSyncNode->vgId, pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex,
pMsg->lastTerm, pMsg->lastConfigIndex, pReceiver->privateTerm); pSyncNode->pRaftStore->currentTerm, host, port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm,
pMsg->lastConfigIndex, pReceiver->privateTerm);
} }
} else { } else {
......
...@@ -168,14 +168,26 @@ char* syncUtilRaftId2Str(const SRaftId* p) { ...@@ -168,14 +168,26 @@ char* syncUtilRaftId2Str(const SRaftId* p) {
} }
const char* syncUtilState2String(ESyncState state) { const char* syncUtilState2String(ESyncState state) {
/*
if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "TAOS_SYNC_STATE_FOLLOWER";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) {
return "TAOS_SYNC_STATE_CANDIDATE";
} else if (state == TAOS_SYNC_STATE_LEADER) {
return "TAOS_SYNC_STATE_LEADER";
} else {
return "TAOS_SYNC_STATE_UNKNOWN";
}
*/
if (state == TAOS_SYNC_STATE_FOLLOWER) { if (state == TAOS_SYNC_STATE_FOLLOWER) {
return "TAOS_SYNC_STATE_FOLLOWER"; return "follower";
} else if (state == TAOS_SYNC_STATE_CANDIDATE) { } else if (state == TAOS_SYNC_STATE_CANDIDATE) {
return "TAOS_SYNC_STATE_CANDIDATE"; return "candidate";
} else if (state == TAOS_SYNC_STATE_LEADER) { } else if (state == TAOS_SYNC_STATE_LEADER) {
return "TAOS_SYNC_STATE_LEADER"; return "leader";
} else { } else {
return "TAOS_SYNC_STATE_UNKNOWN"; return "state_error";
} }
} }
......
...@@ -282,6 +282,8 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) { ...@@ -282,6 +282,8 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) {
} }
int32_t tfsRmdir(STfs *pTfs, const char *rname) { int32_t tfsRmdir(STfs *pTfs, const char *rname) {
ASSERT(rname[0] != 0);
char aname[TMPNAME_LEN] = "\0"; char aname[TMPNAME_LEN] = "\0";
for (int32_t level = 0; level < pTfs->nlevel; level++) { for (int32_t level = 0; level < pTfs->nlevel; level++) {
...@@ -289,6 +291,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) { ...@@ -289,6 +291,7 @@ int32_t tfsRmdir(STfs *pTfs, const char *rname) {
for (int32_t id = 0; id < pTier->ndisk; id++) { for (int32_t id = 0; id < pTier->ndisk; id++) {
STfsDisk *pDisk = pTier->disks[id]; STfsDisk *pDisk = pTier->disks[id];
snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname);
uInfo("====> tfs remove dir : path:%s aname:%s rname:[%s]", pDisk->path, aname, rname);
taosRemoveDir(aname); taosRemoveDir(aname);
} }
} }
......
...@@ -277,9 +277,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic") ...@@ -277,9 +277,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC, "Invalid topic")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid query") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_QUERY, "Topic with invalid query")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TOPIC_OPTION, "Topic with invalid option")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_EXIST, "Consumer not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED, "Consumer unchanged")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SUBSCRIBE_NOT_EXIST, "Subcribe not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_OFFSET_NOT_EXIST, "Offset not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CONSUMER_NOT_READY, "Consumer not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_SUBSCRIBED, "Topic subscribed cannot be dropped")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CGROUP_USED, "Consumer group being used by some consumer")
// mnode-stream
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_ALREADY_EXIST, "Stream already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_STREAM_NOT_EXIST, "Stream not exist")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_STREAM_OPTION, "Invalid stream option")
......
...@@ -3,6 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1 ...@@ -3,6 +3,10 @@ system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2 system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3 system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4 system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode2 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode3 -c transPullupInterval -v 1
system sh/cfg.sh -n dnode4 -c transPullupInterval -v 1
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sql connect sql connect
...@@ -260,7 +264,7 @@ $x = 0 ...@@ -260,7 +264,7 @@ $x = 0
step92: step92:
$x = $x + 1 $x = $x + 1
sleep 1000 sleep 1000
if $x == 10 then if $x == 20 then
return -1 return -1
endi endi
sql show mnodes sql show mnodes
......
...@@ -33,8 +33,16 @@ import taos ...@@ -33,8 +33,16 @@ import taos
def checkRunTimeError(): def checkRunTimeError():
import win32gui import win32gui
timeCount = 0
while 1: while 1:
time.sleep(1) time.sleep(1)
timeCount = timeCount + 1
if (timeCount>900):
os.system("TASKKILL /F /IM taosd.exe")
os.system("TASKKILL /F /IM taos.exe")
os.system("TASKKILL /F /IM tmq_sim.exe")
os.system("TASKKILL /F /IM mintty.exe")
quit(0)
hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library") hwnd = win32gui.FindWindow(None, "Microsoft Visual C++ Runtime Library")
if hwnd: if hwnd:
os.system("TASKKILL /F /IM taosd.exe") os.system("TASKKILL /F /IM taosd.exe")
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册