提交 0161b645 编写于 作者: S Shengliang Guan

refactor: adjust msgcb

上级 366d0f6d
...@@ -20,14 +20,11 @@ ...@@ -20,14 +20,11 @@
extern "C" { extern "C" {
#endif #endif
#include <stdbool.h> #include "os.h"
#include <stdint.h>
//#include <tdatablock.h>
#include "cJSON.h" #include "cJSON.h"
#include "tdef.h" #include "tdef.h"
//#include "taosdef.h" #include "tmsgcb.h"
//#include "trpc.h"
//#include "wal.h"
typedef uint64_t SyncNodeId; typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;
...@@ -132,11 +129,10 @@ typedef struct SSyncInfo { ...@@ -132,11 +129,10 @@ typedef struct SSyncInfo {
char path[TSDB_FILENAME_LEN]; char path[TSDB_FILENAME_LEN];
SWal* pWal; SWal* pWal;
SSyncFSM* pFsm; SSyncFSM* pFsm;
SMsgCb* msgcb;
void* rpcClient; int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
void* queue;
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
} SSyncInfo; } SSyncInfo;
......
...@@ -20,13 +20,10 @@ ...@@ -20,13 +20,10 @@
extern "C" { extern "C" {
#endif #endif
#include <stdbool.h> #include "os.h"
#include <stdint.h>
//#include <tdatablock.h>
#include "cJSON.h" #include "cJSON.h"
//#include "taosdef.h"
#include "trpc.h" #include "trpc.h"
//#include "wal.h"
// ------------------ ds ------------------- // ------------------ ds -------------------
typedef struct SRaftId { typedef struct SRaftId {
...@@ -43,8 +40,7 @@ void syncNodeRelease(SSyncNode* pNode); ...@@ -43,8 +40,7 @@ void syncNodeRelease(SSyncNode* pNode);
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg); int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
void syncSetQ(int64_t rid, void* queueHandle); void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb);
void syncSetRpc(int64_t rid, void* rpcHandle);
char* sync2SimpleStr(int64_t rid); char* sync2SimpleStr(int64_t rid);
// set timer ms // set timer ms
......
...@@ -85,10 +85,9 @@ int vnodeAsyncCommit(SVnode* pVnode); ...@@ -85,10 +85,9 @@ int vnodeAsyncCommit(SVnode* pVnode);
int32_t vnodeSyncOpen(SVnode* pVnode, char* path); int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
int32_t vnodeSyncStart(SVnode* pVnode); int32_t vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode); void vnodeSyncClose(SVnode* pVnode);
void vnodeSyncSetQ(SVnode* pVnode, void* qHandle); void vnodeSyncSetMsgCb(SVnode* pVnode);
void vnodeSyncSetRpc(SVnode* pVnode, void* rpcHandle); int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t vnodeSyncEqMsg(void* qHandle, SRpcMsg* pMsg); int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg);
int32_t vnodeSendMsg(void* rpcHandle, const SEpSet* pEpSet, SRpcMsg* pMsg);
void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
......
...@@ -178,8 +178,7 @@ void vnodeClose(SVnode *pVnode) { ...@@ -178,8 +178,7 @@ void vnodeClose(SVnode *pVnode) {
// start the sync timer after the queue is ready // start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) { int32_t vnodeStart(SVnode *pVnode) {
vnodeSyncSetQ(pVnode, NULL); vnodeSyncSetMsgCb(pVnode);
vnodeSyncSetRpc(pVnode, NULL);
vnodeSyncStart(pVnode); vnodeSyncStart(pVnode);
return 0; return 0;
} }
......
...@@ -27,9 +27,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) { ...@@ -27,9 +27,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
syncInfo.pWal = pVnode->pWal; syncInfo.pWal = pVnode->pWal;
syncInfo.pFsm = syncVnodeMakeFsm(pVnode); syncInfo.pFsm = syncVnodeMakeFsm(pVnode);
syncInfo.rpcClient = NULL; syncInfo.msgcb = NULL;
syncInfo.FpSendMsg = vnodeSendMsg; syncInfo.FpSendMsg = vnodeSyncSendMsg;
syncInfo.queue = NULL;
syncInfo.FpEqMsg = vnodeSyncEqMsg; syncInfo.FpEqMsg = vnodeSyncEqMsg;
pVnode->sync = syncOpen(&syncInfo); pVnode->sync = syncOpen(&syncInfo);
...@@ -53,31 +52,13 @@ void vnodeSyncClose(SVnode *pVnode) { ...@@ -53,31 +52,13 @@ void vnodeSyncClose(SVnode *pVnode) {
syncStop(pVnode->sync); syncStop(pVnode->sync);
} }
void vnodeSyncSetQ(SVnode *pVnode, void *qHandle) { syncSetQ(pVnode->sync, (void *)(&(pVnode->msgCb))); } void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); }
void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle) { syncSetRpc(pVnode->sync, (void *)(&(pVnode->msgCb))); } int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
int32_t vnodeSyncEqMsg(void *qHandle, SRpcMsg *pMsg) { int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t ret = 0; pMsg->info.noResp = 1;
SMsgCb *pMsgCb = qHandle; return tmsgSendReq(pEpSet, pMsg);
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
tmsgPutToQueue(qHandle, SYNC_QUEUE, pMsg);
} else {
vError("vnodeSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
}
return ret;
}
int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
int32_t ret = 0;
SMsgCb *pMsgCb = rpcHandle;
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
pMsg->info.noResp = 1;
tmsgSendReq(pEpSet, pMsg);
} else {
vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
}
return ret;
} }
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
......
...@@ -36,10 +36,10 @@ typedef struct SSyncIO { ...@@ -36,10 +36,10 @@ typedef struct SSyncIO {
STaosQueue *pMsgQ; STaosQueue *pMsgQ;
STaosQset * pQset; STaosQset * pQset;
TdThread consumerTid; TdThread consumerTid;
void *serverRpc;
void * serverRpc; void *clientRpc;
void * clientRpc; SEpSet myAddr;
SEpSet myAddr; SMsgCb msgcb;
tmr_h qTimer; tmr_h qTimer;
int32_t qTimerMS; int32_t qTimerMS;
...@@ -65,8 +65,8 @@ extern SSyncIO *gSyncIO; ...@@ -65,8 +65,8 @@ extern SSyncIO *gSyncIO;
int32_t syncIOStart(char *host, uint16_t port); int32_t syncIOStart(char *host, uint16_t port);
int32_t syncIOStop(); int32_t syncIOStop();
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg); int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
int32_t syncIOQTimerStart(); int32_t syncIOQTimerStart();
int32_t syncIOQTimerStop(); int32_t syncIOQTimerStop();
......
...@@ -159,11 +159,10 @@ typedef struct SSyncNode { ...@@ -159,11 +159,10 @@ typedef struct SSyncNode {
char configPath[TSDB_FILENAME_LEN * 2]; char configPath[TSDB_FILENAME_LEN * 2];
// sync io // sync io
SWal* pWal; SWal* pWal;
void* rpcClient; const SMsgCb* msgcb;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue; int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
// init internal // init internal
SNodeInfo myNodeInfo; SNodeInfo myNodeInfo;
......
...@@ -66,7 +66,7 @@ int32_t syncIOStop() { ...@@ -66,7 +66,7 @@ int32_t syncIOStop() {
return ret; return ret;
} }
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
assert(pEpSet->inUse == 0); assert(pEpSet->inUse == 0);
assert(pEpSet->numOfEps == 1); assert(pEpSet->numOfEps == 1);
...@@ -83,11 +83,11 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { ...@@ -83,11 +83,11 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
pMsg->info.handle = NULL; pMsg->info.handle = NULL;
pMsg->info.noResp = 1; pMsg->info.noResp = 1;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); rpcSendRequest(gSyncIO->clientRpc, pEpSet, pMsg, NULL);
return ret; return ret;
} }
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
int32_t ret = 0; int32_t ret = 0;
char logBuf[128]; char logBuf[128];
syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg); syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg);
...@@ -96,7 +96,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { ...@@ -96,7 +96,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM); pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
memcpy(pTemp, pMsg, sizeof(SRpcMsg)); memcpy(pTemp, pMsg, sizeof(SRpcMsg));
STaosQueue *pMsgQ = queue; STaosQueue *pMsgQ = gSyncIO->pMsgQ;
taosWriteQitem(pMsgQ, pTemp); taosWriteQitem(pMsgQ, pTemp);
return ret; return ret;
......
...@@ -240,26 +240,14 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { ...@@ -240,26 +240,14 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
return ret; return ret;
} }
void syncSetQ(int64_t rid, void* queue) { void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid); sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
return; return;
} }
assert(rid == pSyncNode->rid); assert(rid == pSyncNode->rid);
pSyncNode->queue = queue; pSyncNode->msgcb = msgcb;
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
}
void syncSetRpc(int64_t rid, void* rpcHandle) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid);
return;
}
assert(rid == pSyncNode->rid);
pSyncNode->rpcClient = rpcHandle;
taosReleaseRef(tsNodeRefId, pSyncNode->rid); taosReleaseRef(tsNodeRefId, pSyncNode->rid);
} }
...@@ -332,7 +320,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { ...@@ -332,7 +320,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->FpEqMsg != NULL) {
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
} else { } else {
sTrace("syncPropose pSyncNode->FpEqMsg is NULL"); sTrace("syncPropose pSyncNode->FpEqMsg is NULL");
} }
...@@ -375,9 +363,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { ...@@ -375,9 +363,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->pWal = pSyncInfo->pWal;
pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->msgcb = pSyncInfo->msgcb;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->queue = pSyncInfo->queue;
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
// init raft config // init raft config
...@@ -691,7 +678,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp ...@@ -691,7 +678,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
// htonl // htonl
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); pSyncNode->FpSendMsg(&epSet, pMsg);
} else { } else {
sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL"); sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
} }
...@@ -706,7 +693,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S ...@@ -706,7 +693,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
// htonl // htonl
syncUtilMsgHtoN(pMsg->pCont); syncUtilMsgHtoN(pMsg->pCont);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); pSyncNode->FpSendMsg(&epSet, pMsg);
} else { } else {
sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL"); sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
} }
...@@ -728,12 +715,12 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { ...@@ -728,12 +715,12 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf); cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf); cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
cJSON_AddStringToObject(pRoot, "queue", u64buf); cJSON_AddStringToObject(pRoot, "queue", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf); cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);
...@@ -1095,7 +1082,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { ...@@ -1095,7 +1082,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->FpEqMsg != NULL) {
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
} else { } else {
sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL"); sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
} }
...@@ -1118,7 +1105,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { ...@@ -1118,7 +1105,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->FpEqMsg != NULL) {
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
} else { } else {
sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL"); sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL");
} }
...@@ -1145,7 +1132,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { ...@@ -1145,7 +1132,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg); syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
if (pSyncNode->FpEqMsg != NULL) { if (pSyncNode->FpEqMsg != NULL) {
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
} else { } else {
sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL"); sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL");
} }
...@@ -1175,10 +1162,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) { ...@@ -1175,10 +1162,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
assert(pSyncMsg->dataLen == entryLen); assert(pSyncMsg->dataLen == entryLen);
memcpy(pSyncMsg->data, serialized, entryLen); memcpy(pSyncMsg->data, serialized, entryLen);
SRpcMsg rpcMsg; SRpcMsg rpcMsg = {0};
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
if (ths->FpEqMsg != NULL) { if (ths->FpEqMsg != NULL) {
ths->FpEqMsg(ths->queue, &rpcMsg); ths->FpEqMsg(ths->msgcb, &rpcMsg);
} else { } else {
sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL"); sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
} }
......
...@@ -100,9 +100,8 @@ SWal* createWal(char* path, int32_t vgId) { ...@@ -100,9 +100,8 @@ SWal* createWal(char* path, int32_t vgId) {
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy) { int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy) {
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm(); syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
......
...@@ -44,9 +44,8 @@ SWal* createWal(char* path, int32_t vgId) { ...@@ -44,9 +44,8 @@ SWal* createWal(char* path, int32_t vgId) {
SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) { SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) {
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = NULL; syncInfo.pFsm = NULL;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
......
...@@ -31,9 +31,8 @@ SSyncNode *pSyncNode; ...@@ -31,9 +31,8 @@ SSyncNode *pSyncNode;
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -25,9 +25,7 @@ SSyncFSM* pFsm; ...@@ -25,9 +25,7 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
...@@ -99,7 +97,7 @@ int main(int argc, char** argv) { ...@@ -99,7 +97,7 @@ int main(int argc, char** argv) {
SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest"); SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest");
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }
......
...@@ -43,7 +43,7 @@ int main() { ...@@ -43,7 +43,7 @@ int main() {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg); syncIOSendMsg(&epSet, &rpcMsg);
taosSsleep(1); taosSsleep(1);
} }
......
...@@ -25,9 +25,8 @@ SSyncFSM* pFsm; ...@@ -25,9 +25,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
...@@ -103,7 +102,7 @@ int main(int argc, char** argv) { ...@@ -103,7 +102,7 @@ int main(int argc, char** argv) {
SEpSet epSet; SEpSet epSet;
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet); syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, &rpcMsg); pSyncNode->FpSendMsg(&epSet, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }
......
...@@ -28,9 +28,8 @@ SSyncNode* pSyncNode; ...@@ -28,9 +28,8 @@ SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -25,9 +25,8 @@ SSyncFSM* pFsm; ...@@ -25,9 +25,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test");
......
...@@ -25,9 +25,8 @@ SSyncFSM* pFsm; ...@@ -25,9 +25,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -25,9 +25,8 @@ SSyncFSM* pFsm; ...@@ -25,9 +25,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -25,9 +25,8 @@ SSyncFSM* pFsm; ...@@ -25,9 +25,8 @@ SSyncFSM* pFsm;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -97,9 +97,8 @@ SWal* createWal(char* path, int32_t vgId) { ...@@ -97,9 +97,8 @@ SWal* createWal(char* path, int32_t vgId) {
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) { int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) {
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = vgId; syncInfo.vgId = vgId;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = createFsm(); syncInfo.pFsm = createFsm();
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
......
...@@ -83,9 +83,8 @@ void initFsm() { ...@@ -83,9 +83,8 @@ void initFsm() {
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
...@@ -200,7 +199,7 @@ int main(int argc, char **argv) { ...@@ -200,7 +199,7 @@ int main(int argc, char **argv) {
SyncClientRequest *pSyncClientRequest = pMsg1; SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }
......
...@@ -27,9 +27,8 @@ SSyncNode* pSyncNode; ...@@ -27,9 +27,8 @@ SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -27,9 +27,8 @@ SSyncNode* pSyncNode; ...@@ -27,9 +27,8 @@ SSyncNode* pSyncNode;
SSyncNode* syncNodeInit() { SSyncNode* syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
......
...@@ -62,9 +62,8 @@ void initFsm() { ...@@ -62,9 +62,8 @@ void initFsm() {
SSyncNode *syncNodeInit() { SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234; syncInfo.vgId = 1234;
syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.msgcb = &gSyncIO->msgcb;
syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm; syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir); snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
...@@ -178,7 +177,7 @@ int main(int argc, char **argv) { ...@@ -178,7 +177,7 @@ int main(int argc, char **argv) {
SyncClientRequest *pSyncClientRequest = pMsg1; SyncClientRequest *pSyncClientRequest = pMsg1;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg); syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg); gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg);
taosMsleep(1000); taosMsleep(1000);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册