提交 aaf5e20f 编写于 作者: M Minghao Li

sync encode test

上级 12c202aa
...@@ -97,14 +97,17 @@ typedef struct SRaftId { ...@@ -97,14 +97,17 @@ typedef struct SRaftId {
} SRaftId; } SRaftId;
typedef struct SSyncNode { typedef struct SSyncNode {
int8_t replica;
int8_t quorum;
int32_t refCount;
int64_t rid;
SyncGroupId vgId; SyncGroupId vgId;
SSyncCfg syncCfg; SSyncCfg syncCfg;
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
SSyncFSM* pFsm;
// passed from outside
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t refCount;
int64_t rid;
SNodeInfo me; SNodeInfo me;
SNodeInfo peers[TSDB_MAX_REPLICA]; SNodeInfo peers[TSDB_MAX_REPLICA];
...@@ -112,7 +115,6 @@ typedef struct SSyncNode { ...@@ -112,7 +115,6 @@ typedef struct SSyncNode {
ESyncRole role; ESyncRole role;
SRaftId raftId; SRaftId raftId;
SSyncFSM* pFsm;
tmr_h pPingTimer; tmr_h pPingTimer;
int32_t pingTimerMS; int32_t pingTimerMS;
...@@ -145,10 +147,6 @@ typedef struct SSyncNode { ...@@ -145,10 +147,6 @@ typedef struct SSyncNode {
int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
// passed from outside
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
} SSyncNode; } SSyncNode;
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
......
...@@ -30,19 +30,26 @@ static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, S ...@@ -30,19 +30,26 @@ static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, S
static int32_t doSyncIODestroy(SSyncIO *io); static int32_t doSyncIODestroy(SSyncIO *io);
static SSyncIO *syncIOCreate(); static SSyncIO *syncIOCreate();
static void * syncIOConsumer(void *param); static void *syncIOConsumer(void *param);
static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
static void syncIODoReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIODoReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIODoRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIODoRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOTick(void *param, void *tmrId); static void syncIOTick(void *param, void *tmrId);
// ---------------------------- // ----------------------------
int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; } int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
pMsg->handle = NULL;
rpcSendRequest(handle, pEpSet, pMsg, NULL);
return 0;
}
int32_t syncIOStart() { int32_t syncIOStart() {
gSyncIO = syncIOCreate(); gSyncIO = syncIOCreate();
assert(gSyncIO != NULL); assert(gSyncIO != NULL);
int32_t ret = doSyncIOStart(gSyncIO);
assert(ret == 0);
return 0; return 0;
} }
...@@ -67,15 +74,14 @@ static void syncIOTick(void *param, void *tmrId) { ...@@ -67,15 +74,14 @@ static void syncIOTick(void *param, void *tmrId) {
taosWriteQitem(io->pMsgQ, pTemp); taosWriteQitem(io->pMsgQ, pTemp);
bool b = taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, io->syncTimer); taosTmrReset(syncIOTick, 1000, io, io->syncTimerManager, io->syncTimer);
assert(b);
} }
static void *syncIOConsumer(void *param) { static void *syncIOConsumer(void *param) {
SSyncIO *io = param; SSyncIO *io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
int type; int type;
qall = taosAllocateQall(); qall = taosAllocateQall();
...@@ -215,8 +221,8 @@ static int32_t doSyncIOStart(SSyncIO *io) { ...@@ -215,8 +221,8 @@ static int32_t doSyncIOStart(SSyncIO *io) {
} }
// start tmr thread // start tmr thread
io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); // io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC");
io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager); // io->syncTimer = taosTmrStart(syncIOTick, 1000, io, io->syncTimerManager);
return 0; return 0;
} }
......
...@@ -69,15 +69,34 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -69,15 +69,34 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode)); memset(pSyncNode, 0, sizeof(SSyncNode));
pSyncNode->vgId = pSyncInfo->vgId;
pSyncNode->syncCfg = pSyncInfo->syncCfg;
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
pSyncNode->pFsm = pSyncInfo->pFsm;
pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
int j = 0;
for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) {
if (i != pSyncInfo->syncCfg.myIndex) {
pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i];
j++;
}
}
pSyncNode->role = TAOS_SYNC_STATE_FOLLOWER;
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId);
pSyncNode->pPingTimer = NULL; pSyncNode->pPingTimer = NULL;
pSyncNode->pingTimerMS = 1000; pSyncNode->pingTimerMS = 1000;
atomic_store_8(&pSyncNode->pingTimerStart, 0); atomic_store_8(&pSyncNode->pingTimerStart, 0);
pSyncNode->FpPingTimer = syncNodePingTimerCb; pSyncNode->FpPingTimer = syncNodePingTimerCb;
pSyncNode->pingTimerCounter = 0; pSyncNode->pingTimerCounter = 0;
pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPing = syncNodeOnPingCb;
pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb;
pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb;
...@@ -97,10 +116,11 @@ void syncNodePingAll(SSyncNode* pSyncNode) { ...@@ -97,10 +116,11 @@ void syncNodePingAll(SSyncNode* pSyncNode) {
sTrace("syncNodePingAll %p ", pSyncNode); sTrace("syncNodePingAll %p ", pSyncNode);
int32_t ret = 0; int32_t ret = 0;
for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) { for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) {
SyncPing* pSyncPing; SyncPing* pMsg = syncPingBuild(strlen("ping") + 1);
SRaftId raftId; memcpy(pMsg->data, "ping", strlen("ping") + 1);
syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &raftId); syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &pMsg->destId);
ret = syncNodePing(pSyncNode, &raftId, pSyncPing); pMsg->srcId = pSyncNode->raftId;
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
assert(ret == 0); assert(ret == 0);
} }
} }
...@@ -118,10 +138,11 @@ void syncNodePingPeers(SSyncNode* pSyncNode) { ...@@ -118,10 +138,11 @@ void syncNodePingPeers(SSyncNode* pSyncNode) {
void syncNodePingSelf(SSyncNode* pSyncNode) { void syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t ret = 0; int32_t ret = 0;
SyncPing* pSyncPing; SyncPing* pMsg = syncPingBuild(strlen("ping") + 1);
SRaftId raftId; memcpy(pMsg->data, "ping", strlen("ping") + 1);
syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &raftId); pMsg->destId = pSyncNode->raftId;
ret = syncNodePing(pSyncNode, &raftId, pSyncPing); pMsg->srcId = pSyncNode->raftId;
ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg);
assert(ret == 0); assert(ret == 0);
} }
...@@ -146,10 +167,10 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { ...@@ -146,10 +167,10 @@ int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
// ------ local funciton --------- // ------ local funciton ---------
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
int32_t ret = 0; int32_t ret = 0;
SRpcMsg* rpcMsg; SRpcMsg rpcMsg;
syncPing2RpcMsg(pMsg, rpcMsg); syncPing2RpcMsg(pMsg, &rpcMsg);
syncNodeSendMsgById(destRaftId, pSyncNode, rpcMsg); syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
return ret; return ret;
} }
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { uint64_t syncUtilAddr2U64(const char* host, uint16_t port) {
uint64_t u64; uint64_t u64;
uint32_t hostU32 = (uint32_t)inet_addr(host); uint32_t hostU32 = (uint32_t)inet_addr(host);
assert(hostU32 != (uint32_t)-1); // assert(hostU32 != (uint32_t)-1);
u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16); u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16);
return u64; return u64;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册