未验证 提交 ebc17283 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17974 from taosdata/fix/TD-20052

refact: adjust sync resp mgr
......@@ -41,13 +41,12 @@ typedef struct SSyncRespMgr {
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl);
void syncRespMgrDestroy(SSyncRespMgr *pObj);
int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub);
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index);
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub);
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub);
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq);
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub);
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *pInfo);
void syncRespClean(SSyncRespMgr *pObj);
void syncRespCleanRsp(SSyncRespMgr *pObj);
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp);
#ifdef __cplusplus
}
......
......@@ -26,9 +26,6 @@ typedef struct SRaftId {
SyncGroupId vgId;
} SRaftId;
// for compatibility, the same as syncPropose
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak);
// ------------------ for debug -------------------
void syncRpcMsgPrint(SRpcMsg* pMsg);
void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
......
......@@ -452,11 +452,6 @@ int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) {
return ret;
}
int32_t syncForwardToPeer(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
int32_t ret = syncPropose(rid, pMsg, isWeak);
return ret;
}
SSyncState syncGetState(int64_t rid) {
SSyncState state = {.state = TAOS_SYNC_STATE_ERROR};
......@@ -558,109 +553,27 @@ SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapsho
return lastIndex;
}
#if 0
SyncTerm syncGetMyTerm(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
ASSERT(rid == pSyncNode->rid);
SyncTerm term = pSyncNode->pRaftStore->currentTerm;
syncNodeRelease(pSyncNode);
return term;
}
SyncIndex syncGetLastIndex(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return SYNC_INDEX_INVALID;
}
ASSERT(rid == pSyncNode->rid);
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
syncNodeRelease(pSyncNode);
return lastIndex;
}
SyncIndex syncGetCommitIndex(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return SYNC_INDEX_INVALID;
}
ASSERT(rid == pSyncNode->rid);
SyncIndex cmtIndex = pSyncNode->commitIndex;
syncNodeRelease(pSyncNode);
return cmtIndex;
}
SyncGroupId syncGetVgId(int64_t rid) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
return TAOS_SYNC_STATE_ERROR;
}
ASSERT(rid == pSyncNode->rid);
SyncGroupId vgId = pSyncNode->vgId;
syncNodeRelease(pSyncNode);
return vgId;
}
void syncGetEpSet(int64_t rid, SEpSet* pEpSet) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
memset(pEpSet, 0, sizeof(*pEpSet));
return;
}
ASSERT(rid == pSyncNode->rid);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
pEpSet->numOfEps = 0;
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
(pEpSet->numOfEps)++;
sInfo("vgId:%d, sync get epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port);
}
pEpSet->inUse = pSyncNode->pRaftCfg->cfg.myIndex;
sInfo("vgId:%d, sync get epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
syncNodeRelease(pSyncNode);
}
#endif
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
memset(pEpSet, 0, sizeof(*pEpSet));
return;
}
if (pSyncNode == NULL) return;
pEpSet->numOfEps = 0;
for (int32_t i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) {
snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn);
pEpSet->eps[i].port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
(pEpSet->numOfEps)++;
sInfo("vgId:%d, sync get retry epset: index:%d %s:%d", pSyncNode->vgId, i, pEpSet->eps[i].fqdn,
pEpSet->eps[i].port);
SEp* pEp = &pEpSet->eps[i];
tstrncpy(pEp->fqdn, pSyncNode->pRaftCfg->cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
pEp->port = (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodePort;
pEpSet->numOfEps++;
sInfo("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
}
if (pEpSet->numOfEps > 0) {
pEpSet->inUse = (pSyncNode->pRaftCfg->cfg.myIndex + 1) % pEpSet->numOfEps;
}
sInfo("vgId:%d, sync get retry epset in-use:%d", pSyncNode->vgId, pEpSet->inUse);
sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
syncNodeRelease(pSyncNode);
}
static void syncGetAndDelRespRpc(SSyncNode* pSyncNode, uint64_t index, SRpcHandleInfo* pInfo) {
SRespStub stub;
int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub);
if (ret == 1) {
*pInfo = stub.rpcMsg.info;
}
sTrace("vgId:%d, get seq:%" PRIu64 " rpc handle:%p", pSyncNode->vgId, index, pInfo->handle);
}
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
SSyncNode* pSyncNode = syncNodeAcquire(rid);
if (pSyncNode == NULL) {
......@@ -2759,7 +2672,7 @@ int32_t syncNodeDoCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endInde
.flag = flag,
};
syncGetAndDelRespRpc(ths, cbMeta.seqNum, &rpcMsg.info);
syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
}
}
......
......@@ -13,21 +13,22 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "syncRespMgr.h"
#include "syncRaftEntry.h"
#include "syncRaftStore.h"
SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
SSyncRespMgr *pObj = (SSyncRespMgr *)taosMemoryMalloc(sizeof(SSyncRespMgr));
SSyncRespMgr *pObj = taosMemoryCalloc(1, sizeof(SSyncRespMgr));
if (pObj == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memset(pObj, 0, sizeof(SSyncRespMgr));
pObj->pRespHash =
taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
ASSERT(pObj->pRespHash != NULL);
if (pObj->pRespHash == NULL) return NULL;
pObj->ttl = ttl;
pObj->data = data;
pObj->seqNum = 0;
......@@ -38,93 +39,84 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
if (pObj != NULL) {
taosThreadMutexLock(&(pObj->mutex));
taosThreadMutexLock(&pObj->mutex);
taosHashCleanup(pObj->pRespHash);
taosThreadMutexUnlock(&(pObj->mutex));
taosThreadMutexUnlock(&pObj->mutex);
taosThreadMutexDestroy(&(pObj->mutex));
taosMemoryFree(pObj);
}
}
int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosThreadMutexLock(&(pObj->mutex));
uint64_t syncRespMgrAdd(SSyncRespMgr *pObj, const SRespStub *pStub) {
taosThreadMutexLock(&pObj->mutex);
uint64_t keyCode = ++(pObj->seqNum);
taosHashPut(pObj->pRespHash, &keyCode, sizeof(keyCode), pStub, sizeof(SRespStub));
uint64_t seq = ++(pObj->seqNum);
int32_t code = taosHashPut(pObj->pRespHash, &seq, sizeof(uint64_t), pStub, sizeof(SRespStub));
sNTrace(pObj->data, "save message handle:%p, type:%s seq:%" PRIu64 " code:0x%x", pStub->rpcMsg.info.handle,
TMSG_INFO(pStub->rpcMsg.msgType), seq, code);
sNTrace(pObj->data, "save message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType),
keyCode, pStub->rpcMsg.info.handle);
taosThreadMutexUnlock(&(pObj->mutex));
return keyCode;
taosThreadMutexUnlock(&pObj->mutex);
return seq;
}
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t index) {
taosThreadMutexLock(&(pObj->mutex));
int32_t syncRespMgrDel(SSyncRespMgr *pObj, uint64_t seq) {
taosThreadMutexLock(&pObj->mutex);
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
int32_t code = taosHashRemove(pObj->pRespHash, &seq, sizeof(seq));
sNTrace(pObj->data, "remove message handle, seq:%" PRIu64 " code:%d", seq, code);
taosThreadMutexUnlock(&(pObj->mutex));
return 0;
taosThreadMutexUnlock(&pObj->mutex);
return code;
}
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
taosThreadMutexLock(&(pObj->mutex));
int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) {
taosThreadMutexLock(&pObj->mutex);
void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
SRespStub *pTmp = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t));
if (pTmp != NULL) {
memcpy(pStub, pTmp, sizeof(SRespStub));
sNTrace(pObj->data, "get message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType), seq,
pStub->rpcMsg.info.handle);
sNTrace(pObj->data, "get message handle, type:%s seq:%" PRIu64 " handle:%p", TMSG_INFO(pStub->rpcMsg.msgType),
index, pStub->rpcMsg.info.handle);
taosThreadMutexUnlock(&(pObj->mutex));
taosThreadMutexUnlock(&pObj->mutex);
return 1; // get one object
}
taosThreadMutexUnlock(&(pObj->mutex));
taosThreadMutexUnlock(&pObj->mutex);
return 0; // get none object
}
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
taosThreadMutexLock(&(pObj->mutex));
int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *pInfo) {
taosThreadMutexLock(&pObj->mutex);
void *pTmp = taosHashGet(pObj->pRespHash, &index, sizeof(index));
if (pTmp != NULL) {
memcpy(pStub, pTmp, sizeof(SRespStub));
SRespStub *pStub = taosHashGet(pObj->pRespHash, &seq, sizeof(uint64_t));
if (pStub != NULL) {
*pInfo = pStub->rpcMsg.info;
sNTrace(pObj->data, "get-and-del message handle:%p, type:%s seq:%" PRIu64, pStub->rpcMsg.info.handle,
TMSG_INFO(pStub->rpcMsg.msgType), seq);
taosHashRemove(pObj->pRespHash, &seq, sizeof(uint64_t));
sNTrace(pObj->data, "get-and-del message handle, type:%s seq:%" PRIu64 " handle:%p",
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle);
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
taosThreadMutexUnlock(&(pObj->mutex));
taosThreadMutexUnlock(&pObj->mutex);
return 1; // get one object
}
taosThreadMutexUnlock(&(pObj->mutex));
return 0; // get none object
}
void syncRespCleanRsp(SSyncRespMgr *pObj) {
taosThreadMutexLock(&(pObj->mutex));
syncRespCleanByTTL(pObj, -1, true);
taosThreadMutexUnlock(&(pObj->mutex));
}
void syncRespClean(SSyncRespMgr *pObj) {
taosThreadMutexLock(&(pObj->mutex));
syncRespCleanByTTL(pObj, pObj->ttl, false);
taosThreadMutexUnlock(&(pObj->mutex));
taosThreadMutexUnlock(&pObj->mutex);
return 0; // get none object
}
void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
SRespStub *pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, NULL);
int cnt = 0;
int sum = 0;
SSyncNode *pSyncNode = pObj->data;
SArray *delIndexArray = taosArrayInit(0, sizeof(uint64_t));
ASSERT(delIndexArray != NULL);
sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
if (delIndexArray == NULL) return;
sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
while (pStub) {
size_t len;
void * key = taosHashGetKey(pStub, &len);
void *key = taosHashGetKey(pStub, &len);
uint64_t *pSeqNum = (uint64_t *)key;
sum++;
......@@ -149,15 +141,15 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
pStub->rpcMsg.contLen = 0;
// TODO: and make rpcMsg body, call commit cb
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta);
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
if (pStub->rpcMsg.info.handle != NULL) {
tmsgSendRsp(&(pStub->rpcMsg));
tmsgSendRsp(&pStub->rpcMsg);
}
}
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
pStub = taosHashIterate(pObj->pRespHash, pStub);
}
int32_t arraySize = taosArrayGetSize(delIndexArray);
......@@ -170,3 +162,15 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
}
taosArrayDestroy(delIndexArray);
}
void syncRespCleanRsp(SSyncRespMgr *pObj) {
taosThreadMutexLock(&pObj->mutex);
syncRespCleanByTTL(pObj, -1, true);
taosThreadMutexUnlock(&pObj->mutex);
}
void syncRespClean(SSyncRespMgr *pObj) {
taosThreadMutexLock(&pObj->mutex);
syncRespCleanByTTL(pObj, pObj->ttl, false);
taosThreadMutexUnlock(&pObj->mutex);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册